Implement Parallelism in Airflow - Part 2

Christopher Lagali
5 min readDec 26, 2019

--

Photo by Bence Balla-Schottner on Unsplash

Why do all this?

Out-of-box configuration of Airflow allows us to execute tasks sequentially which is ideal if your DAG depends on it.

However, there are certain use cases which would require for tasks to be run in parallel. Cases where we are trying to load 3 files into 3 separate tables that will be faster when run in parallel.

This story will guide novice Airflow users to implement and experiment with parallelism on their local Airflow installations. For demonstration purposes we have installed Airflow on EC2 machine guide for which can be found here:

Once you have airflow up and running we can now install postgres server and use it as a back end for Airflow instead of SQLite (default).

Step 1: Install postgres

sudo yum install postgresql postgresql-server postgresql-devel postgresql-contrib postgresql-docs

Issue Faced: sudo: postgresql-setup: command not found

It appears postgresql made some fairly major name changes around v9 such that postgresql-setup — initdb and postgresql-setup initdb are now equivalent to initdb.

First, we might need to change permissions/ownership to the data directory.

Let us initialize the database with:

initdb -D /var/lib/pgsql92

Issue Faced: initdb: directory “/var/lib/pgsql92” exists

Here try deleting the folder and rerun initdb

sudo rm -rf /var/lib/pgsq192/

Many popular tutorials out there suggest sudo service postgres start

However this seemed to work for me.

pg_ctl -D /var/lib/pgsql -l logfile start

Initialize postgresql

sudo service postgresql initdb

Start postgresql service

sudo service postgresql start

Let us login into the psql to execute our DDL statements.

sudo -u postgres psql

Lets execute the following:

CREATE DATABASE airflow;

Modify pg_hba.conf configuration file

We also need to reconfigure pg_hba.conf to allow connection from airflow.

nano /var/lib/pgsql9/data/pg_hba.conf

Here we have modified IPV4 local connection setting to:

host all all  0.0.0.0/0 trust

Save the file and lets modify postgresql.conf.

nano /var/lib/pgsql9/data/postgresql.conf

Here we will remove comments from the following lines:

listen_addresses = ‘*’port = 5432 

This setting enables the service to listen to any IP address on port 5432. Host and port for this postgres server will then be used by Airflow to store its metadata.

Let’s Restart the service so that changes can take effect.

sudo service postgresql restart

We should pass along the connection info of the postgresql database to our Airflow Server that we have running.

Head over to our Airflow Config file named airflow.cfg:

  • Modify sql_alchemy_conn configuration:
sql_alchemy_conn = postgresql+psycopg2://postgres@localhost:5432/airflow
  • Modify executor configuration:
executor = LocalExecutor

Explanation:

  • Connection String provided to sql_alchemy_conn allows Airflow to communicate with postgresql Service using postgres username.
  • executor configuration when set to LocalExecutor will spawn number of processes that is equal to the value of parallelism set in airflow.conf file.
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

Lets register these changes by running:

airflow initdb

If you see this type of a screen then you are good!

Note: LocalExecutor is suitable for testing purposes only. CeleryExecutor is a more preferred option for production workloads.

We can now test this by a script that I have created.

Start the Airflow server:

airflow webserver
airflow scheduler

Once up, let us locate our DAG and trigger it.

The Graph View of the DAG will shows three tasks that will be triggered in parallel after the hello_task. True test of parallelism is when all these tasks will be triggered and completed simultaneously.

Once hello_task is completed all three Hive tasks are attempted at the same time as demonstrated by the light green box on each of these tasks.

Finally!! All three Hive tasks have been completed successfully and at the same time which means that our configuration is spot on!

SequentialExecutor in this case; would have executed these tasks one after the other irrespective of the task flow. So, modifying the executor to Local or Celery is essential for this configuration to work!

--

--

Christopher Lagali
Christopher Lagali

Written by Christopher Lagali

An Enthusiastic Data Eng. who is on a mission to unravel the possibilities of pipeline building with AWS and who believes in knowledge sharing.

No responses yet