Executando jobs apache spark no apache airflow

Jozimar Back
4 min readNov 28, 2020

Apache Spark é uma solução que auxilia e muito no processamento de dados de forma distribuída. Para automatizar as tarefas de processamento distribuído, uma ótima solução é o escalonamento destas tarefas dentro do Apache Airflow. Neste tutorial compartilho formas de criar DAG’s no Apache Airflow capazes de executar tarefas Apache Spark em ambiente on premisse.

spiral firework in mountains by Wil Stewart
Photo by Wil Stewart on Unsplash

Preparando ambiente na máquina Airflow

A maquina que hospeda o Airflow em que realizei as configurações roda com Debian 9. Para rodar Spark no Airflow utilizando PythonOperator e do BashOperator deve ser configurado no mínimo o JAVA_HOME. Caso não tenha o java instalado, instale com os seguintes comandos:

sudo apt update
sudo apt install default-jdk

Após a instalação, deve ser informado o JAVA_HOME no sistema operacional mapeando o local da instalação do java. Por exemplo, no Debian, no arquivo .bashrc que fica no diretório root, você deve informar a seguinte linha:

export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64'
export PATH=$PATH:$JAVA_HOME/bin

Caso esteja no linux, após editar o arquivo lembre-se de rodar:

source ~/.bashrc

Para rodar script utilizando o operador SparkSubmitOperator, além do Java mapeado, deve ser adicionado e mapeado os binários do Spark. Na pagina do Spark você pode fazer o download do arquivo tgz e descompactar na maquina que hospeda o Airflow. Defina no arquivo .bashrc o SPARK_HOME e adicione ao PATH do sistema.

export SPARK_HOME='/opt/spark'
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

Por fim você deve adicionar o pacote pyspark ao ambiente em que roda o Airflow.

pip install pyspark

Nos exemplos a seguir, temos exemplo de uma tarefa Spark que faz ETL de base SQL para base mongo. A versão do Spark utilizada foi a 3.0.1 que é compatível com o pacote de conexão org.mongodb.spark:mongo-spark-connector_2.12:3.0.0.

PythonOperator

Com PythonOperator basta criar o método de execução que irá rodar a tarefa Spark e enviar a job a partir do Airflow. O problema desta abordagem é que você fica sem os detalhes de logs da execução da tarefa Spark.

script dag airflow com pythonoperator

BashOperator

Para utilizar este operador, você pode criar um arquivo python com o código Spark e outro arquivo python contendo o código da DAG para o Airflow. Dentro do BashOperator, o parâmetro bash_command recebe o comando que será executado no bash do sistema operacional. Nesta parâmetro por exemplo pode ser executado comando python jobspark.py.

Neste operador você terá mais detalhes de logs para acompanhar a tarefa Spark. Cotando assim com informações sobre o estagio e o percentual de conclusão da tarefa da mesma maneira que seria no terminal

Detalhe do log no BashOperator

SparkSubmitOperator

Para utilizar este operador, após ter JAVA_HOME e os binarios do Spark mapeados na máquina Airflow, você deverá cadastrar a conexão com a maquina master do cluster Spark no painel administrativo do Airflow.

Cadastro conexão spark master

No SparkSubmitOperator no parâmetro conn_id será utilizado o Conn Id cadastrado através do painel administrativo. Uma das principais vantagens que considero neste operador é poder configurar e informar todas os propriedade do job Spark. Deixando assim o script Spark mais enxuto com praticamente somente a logica a ser enviada e executada no cluster. Por baixo dos panos este operador utiliza comando bash spark-submit utilizando as definições informadas no operador.

Exemplo SparkSubmitOperator

Neste operador os logs da tarefa ficam muito mais detalhados, contendo informações do TaskSetManager sobre cada tarefa iniciada e finalizada

Tratamento de possíveis erros

Durante a criação de DAG’s tive alguns contratempos e nesta seção gostaria de compartilhar como resolver.

ERROR — [Errno 2] No such file or directory: ‘bash’

Isto ocorre no airflow ao executar algum comando bash. Por algum motivo a instalação pode não reconhecer o caminho do bash do sistema operacional. Para resolver adicione a propriedade env no BashOperator informando o PATH que contem o bash. No SparkSumbitOperator você deve informar o PATH na propriedade env_vars. Segue exemplo:

print_path_env_task = BashOperator(
task_id='elt_documento_pagar_spark',
bash_command="python ./dags/spark-jdbc-sql-test.py",
dag=dag,
env={'PATH': '/bin:/usr/bin:/usr/local/bin'}
)
task = SparkSubmitOperator(
task_id='elt_documento_pagar_spark',
conn_id='spark',
application="./dags/spark-jdbc-sql-test.py",
env_vars={'PATH': '/bin:/usr/bin:/usr/local/bin'},
packages="org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre8"
)

SyntaxError: Non-ASCII character

Isto ocorre ao processar o código Spark que contem caracteres especiais. Para resolver basta no topo do arquivo python escrever o seguinte comentário.

# -*- coding: utf-8 -*-

--

--

Jozimar Back

I write articles about my experience in Data Engineering.