Monitoring Apache Spark Applications Running on Amazon EMR | Datadog

Monitoring Apache Spark applications running on Amazon EMR

Author Priya Matpadi

Published: November 5, 2018

This is a guest post by Priya Matpadi, Principal Engineer at Lookout, a mobile-first security platform for protecting mobile endpoints, consumer-facing apps, and more. This post originally appeared on her blog.

Editor’s note: Apache uses the terms “master” to describe its architecture and certain metrics. When possible, Datadog does not use these terms. Except when referring to specific metric names for clarity, we will replace this words with “primary”.

We recently implemented a Spark streaming application, which consumes data from from multiple Kafka topics. The data consumed from Kafka comprises different types of telemetry events generated by mobile devices. We decided to host the Spark cluster using the Amazon EMR service, which manages a fleet of EC2 instances to run our data-processing pipelines.

As part of preparing the cluster and application for deployment to production, we needed to implement monitoring so we could track the streaming application and the Spark infrastructure itself. At a high level, we wanted ensure that we could monitor the different components of the application, understand performance parameters, and get alerted when things go wrong.

In this post, we’ll walk through how we aggregated relevant metrics in Datadog from our Spark streaming application running on a YARN cluster in EMR.

Why Datadog?

The Spark UI provides a pretty good dashboard to display useful information about the health of the running application. However, this tool provides only one angle on the kind of information you need for understanding your application in a production environment. And although metrics generated by EMR are automatically collected and pushed to Amazon’s CloudWatch service, this data is more focused on running MapReduce tasks on the YARN cluster, rather than Spark streaming applications.

Luckily, Datadog provides built-in integrations for monitoring both Amazon EMR and Spark. Moreover, we were already using Datadog monitoring for application and cluster monitoring, so monitoring our Spark streaming application in the same platform was a natural choice.

Integrating Datadog with EMR

Setting up the Datadog integration with EMR is pretty straightforward. All the steps and metrics you can graph are documented nicely here. Just make sure your Datadog account is linked to your relevant AWS account, and has permission to pull metrics. Here is what the process looks like, in short:

Step 1: Verify that Datadog is configured to collect data from your AWS account, and that ElasticMapReduce is selected in the list of available AWS integrations on the left.

Configuring the AWS integration in Datadog to enable the collection of EMR metrics

Step 2: Ensure the AWS role specified in the Configuration tab has List* and Describe* permissions for Elastic MapReduce.

Ensure that the AWS role for Datadog has the proper permissions to gather EMR metrics

To make it easier to filter and aggregate your metrics, you can apply tags to your EMR cluster using the AWS console:

Applying tags to data from an EMR cluster using the AWS console

Within a few minutes, your cluster’s metrics should become available in Datadog.

Graphing EMR metrics in Datadog

Collecting Spark metrics in Datadog

Next, we’ll show you how you can set up your EMR cluster to publish Spark driver, executor, and RDD metrics about the Spark streaming app to Datadog. You can read more about Datadog’s Spark integration here.

To install the Datadog Agent on our cluster and enable the Agent’s Spark check, we leveraged EMR Bootstrap actions. From the AWS documentation:

You can use a bootstrap action to install additional software on your cluster. Bootstrap actions are scripts that are run on the cluster nodes when Amazon EMR launches the cluster. They run before Amazon EMR installs specified applications and the node begins processing data. If you add nodes to a running cluster, bootstrap actions run on those nodes also.

Setting up the Spark check on an EMR cluster is a two-step process, each executed by a separate script:

  1. Install the Datadog Agent on each node in the EMR cluster
  2. Configure the Datadog Agent on the primary node to run the Spark check at regular intervals and publish Spark metrics to Datadog

Examples of both scripts can be found below. The examples below require that both scripts have been uploaded to S3 under <s3-bucket-name>/bootstrap-actions/.

Install the Datadog Agent on EMR nodes

The first script, emr-bootstrap-datadog-install.sh, is launched by the bootstrap step during EMR launch. The script downloads and installs the Datadog Agent on each node of the cluster. Simple! It then executes the second script, emr-bootstrap-datadog-spark-check-setup.sh, as a background process. Note that the first script requires four positional arguments:

  1. A Datadog API key
  2. The name of the EMR cluster
  3. An environment name to apply as a tag
  4. The name of the S3 bucket containing the bootstrap scripts
#!/bin/bash

# Clean install Datadog agent
sudo yum -y erase datadog-agent
sudo rm -rf /etc/datadog-agent
sudo rm -rf /etc/dd-agent

# INPUT: Datadog account key
DD_API_KEY=$1
DD_AGENT_MAJOR_VERSION=7 bash -c "$(curl -L https://raw.githubusercontent.com/DataDog/datadog-agent/master/cmd/agent/install_script.sh)"

# INPUT: EMR Cluster name used to tag metrics
CLUSTER_NAME=$2
# INPUT: Env name e.g. stage or prod, used to tag metrics
INSTANCE_TAG=$3
# INPUT: S3 bucket name where spark check configuration script for datadog agent is uploaded
S3_BUCKET=$4
# Spark check configuration script path in above S3 bucket
S3_LOCATION_SPARK_CHECK_SETUP_SCRIPT="s3://${S3_BUCKET}/bootstrap-actions/"
SCRIPT_NAME="emr-bootstrap-datadog-spark-check-setup.sh"

# Copy the spark check configuration script from S3 to current path
aws s3 cp  ${S3_LOCATION_SPARK_CHECK_SETUP_SCRIPT}${SCRIPT_NAME} .
# Make the script executable
chmod +x ${SCRIPT_NAME}

# Bootstrap step occurs on EMR before any software is configured.
# Software configuration is a pre-requisite in order to successfully setup the datadog spark check setup
# Allow bootstrap to complete, so that software configuration can proceed.
./${SCRIPT_NAME} ${CLUSTER_NAME} ${INSTANCE_TAG} spark_check.out 2>&1 &

Configure the Datadog Agent on the primary node

Why do we need to run the configuration step in a separate script? Remember that bootstrap actions are run before any application is installed on the EMR nodes. The first script installed new software (the Datadog Agent), but the second step requires that YARN and Spark are installed before the Datadog configuration can be completed.

yarn-site.xml does not exist at the time that the Datadog Agent is installed. Hence we launch a background process to run the Spark check setup script. It waits until yarn-site.xml is created, and contains the value for the YARN property resourcemanager.hostname. Once that property is found, the script proceeds to create the spark.yaml file and moves it under /etc/dd-agent/conf.d. Then it sets the appropriate permissions on spark.yaml, and restarts the Datadog agent to load the configuration change.

#!/bin/bash

IS_=false
if [ $(grep "\"is\": true" /mnt/var/lib/info/instance.json -wc) = 1 ]; then
  echo "Running on the master node."
  IS_MASTER=true
fi
# Execute spark check configuration only on master node of EMR cluster
if [ "$IS_MASTER" = true ]; then

  # Datadog-Spark Integration
  # https://docs.datadoghq.com/integrations/spark/

  YARN_SITE_XML_LOCATION="/etc/hadoop/conf/yarn-site.xml"
  YARN_PROPERTY="resourcemanager.hostname"
  DD_AGENT_CONF_DIR="/etc/datadog-agent/conf.d/spark.d/"
  SPARK_YAML_FILE="${DD_AGENT_CONF_DIR}/spark.yaml"

  # Commandline Parameters
  CLUSTER_NAME=$1
  INSTANCE_TAG=$2
  CLUSTER_NAME_WITH_ENV_SUFFIX=`echo ${CLUSTER_NAME}-${INSTANCE_TAG}`

  # Wait until yarn-site.xml is available
  while [ ! -f ${YARN_SITE_XML_LOCATION} ]
  do
    sleep 1
  done

  #Debug
  echo "DEBUG: Found: ${YARN_SITE_XML_LOCATION}"
  cat ${YARN_SITE_XML_LOCATION}

  # Wait until yarn-site.xml has expected content
  while [ -z `cat ${YARN_SITE_XML_LOCATION} | grep ${YARN_PROPERTY}` ]
  do
    sleep 1
  done

  #Debug
  cat ${YARN_SITE_XML_LOCATION} | grep ${YARN_PROPERTY}

  # Read the Yarn resource manager hostname to create value for spark_url
  YARN_RM_HOSTNAME_RAW=`cat ${YARN_SITE_XML_LOCATION} | grep -A1 ${YARN_PROPERTY} | grep value`
  YARN_RM_HOSTNAME=`echo ${YARN_RM_HOSTNAME_RAW}|sed -e 's-value--g' -e 's-<--g' -e 's->--g' -e 's-\/-:-g'`
  SPARK_URL=`echo http://${YARN_RM_HOSTNAME}8088`

  #Debug
  echo "DEBUG: Constructed spark_url: ${SPARK_URL}"
  # Create the spark.yaml contents in home directory
  cat  > spark.yaml << EOL
  init_config:
  instances:
    - spark_url: ${SPARK_URL}
      cluster_name: ${CLUSTER_NAME_WITH_ENV_SUFFIX}
      spark_cluster_mode: spark_yarn_mode
      tags:
        - instance:${INSTANCE_TAG}
EOL

  #Debug
  ls -l spark.yaml
  cat spark.yaml

  # Set permissions to move spark.yaml to datadog agent conf.d and reset permissions
  sudo chmod 665 ${DD_AGENT_CONF_DIR}
  sudo mv spark.yaml ${DD_AGENT_CONF_DIR}
  sudo chmod 644 ${SPARK_YAML_FILE}
  sudo chown dd-agent:dd-agent ${SPARK_YAML_FILE}
  sudo chown dd-agent:dd-agent ${DD_AGENT_CONF_DIR}
  sudo chmod 755 ${DD_AGENT_CONF_DIR}
  sudo restart datadog-agent

fi

Invoke install and config scripts via bootstrap actions

You can launch an EMR cluster programmatically, via the AWS CLI, or in the AWS console. Any of these methods offers the option to invoke bootstrap actions. Refer to the AWS documentation for a guide to invoking bootstrap actions while launching clusters from the AWS Console or via the AWS CLI. Below, you can see how we invoked our bootstrap action script (written in Scala) while launching EMR cluster programmatically.

val newClusterJobFlowRequest = new RunJobFlowRequest()
newClusterJobFlowRequest.withBootstrapActions(configureBootstrapActions(config))
                        .withLogUri(logUri)
                        .with... 
                        

private def configureBootstrapActions(emrConfig: Config): Seq[BootstrapActionConfig] = {
  val scriptAbsolutePath = s"s3://${config.s3Bucket.bucketName}/bootstrap-actions/emr-bootstrap-datadog-install.sh"
  val bootstrapActionConfig = new ScriptBootstrapActionConfig().withPath(scriptAbsolutePath)
  bootstrapActionConfig.withArgs(config.cluster_name,
                                 config.stage_env,
                                 config.s3Bucket.bucketName)
  val bootstrapAction = new BootstrapActionConfig()
    .withScriptBootstrapAction(emrSparkStreamingScriptBootstrapActionConfig)
    .withName("DatadogInstaller")
  List(bootstrapAction.asJava)
}

Validate that the integration is properly configured

Finally, to confirm that the bootstrap actions completed successfully, you can check the EMR logs in the S3 log directory you specified while launching the cluster. Bootstrap action logs can be found in a path following this form:

<s3-bucket-name>/<emr_cluster_log_folder>/<emr_cluster_id>/node/<instance_id>/bootstrap-actions

Within a few minutes of deploying your Spark streaming application to your cluster, you should start seeing Spark metrics in Datadog, as shown in the screenshot below:

Visualizing metrics from Apache Spark in Datadog

You can also validate that the Agent check for Spark has been properly configured by ssh-ing into an EMR instance, and executing the following command:

sudo datadog-agent status

In the output, you should see the Spark check being run, as shown below:

=========
Collector
=========

  Running Checks
  ==============
    
    cpu
    ---
        Instance ID: cpu [OK]
        Total Runs: 1
        Metric Samples: 0, Total: 0
        Events: 0, Total: 0
        Service Checks: 0, Total: 0
        Average Execution Time : 0s
        
    ...
    
    spark (1.4.1)
    -------------
        Instance ID: spark:345298b68d34aab9 [OK]
        Total Runs: 2
        Metric Samples: 0, Total: 0
        Events: 0, Total: 0
        Service Checks: 1, Total: 2
        Average Execution Time : 44ms

Monitoring Spark application metrics in Datadog

Now that we have the Datadog Agent collecting Spark metrics from the driver and executor nodes of the EMR cluster, we have also laid the groundwork to publish metrics from our application to Datadog. Because the Datadog Agent is now running on your cluster nodes, you can instrument your application code to publish metrics to the Datadog Agent. As the application developer, you have the best picture of your application’s function, its business logic, and its downstream dependencies, whether it be a database, an API server, or a message bus. Therefore, you have the best idea of what metrics will be useful to monitor the health and performance of your Spark streaming application.

To start collecting custom application metrics in Datadog, launch a reporter thread in the initialization phase of your Spark streaming application, and instrument your application code to publish metrics as events are processed by the application. Spark’s metric system is based on the Dropwizard Metrics library, so you can use a compatible client library like the open source metrics-datadog project to route those metrics to Datadog.

Spark monitoring from all sides

To recap, in this post we’ve walked through implementing multiple layers of monitoring for Spark applications running on Amazon EMR:

  • Enable the Datadog integration with EMR
  • Run scripts at EMR cluster launch to install the Datadog Agent and configure the Spark check
  • Set up your Spark streaming application to publish custom metrics to Datadog
A Datadog dashboard combining metrics from EMR, Apache Spark, and a Spark streaming application

Once you’re collecting data from your EMR cluster, your Spark nodes, and your application, you can create a beautiful dashboard in Datadog combining all this data to provide visibility into the health of your Spark streaming application.