Implement Parallelism in Airflow - Part 2
Why do all this?
Out-of-box configuration of Airflow allows us to execute tasks sequentially which is ideal if your DAG depends on it.
However, there are certain use cases which would require for tasks to be run in parallel. Cases where we are trying to load 3 files into 3 separate tables that will be faster when run in parallel.
This story will guide novice Airflow users to implement and experiment with parallelism on their local Airflow installations. For demonstration purposes we have installed Airflow on EC2 machine guide for which can be found here:
Once you have airflow up and running we can now install postgres server and use it as a back end for Airflow instead of SQLite (default).
Step 1: Install postgres
sudo yum install postgresql postgresql-server postgresql-devel postgresql-contrib postgresql-docs
Issue Faced: sudo: postgresql-setup: command not found
It appears postgresql made some fairly major name changes around v9 such that postgresql-setup — initdb and postgresql-setup initdb are now equivalent to initdb.
First, we might need to change permissions/ownership to the data directory.
Let us initialize the database with:
initdb -D /var/lib/pgsql92
Issue Faced: initdb: directory “/var/lib/pgsql92” exists
Here try deleting the folder and rerun initdb
sudo rm -rf /var/lib/pgsq192/
Many popular tutorials out there suggest sudo service postgres start
However this seemed to work for me.
pg_ctl -D /var/lib/pgsql -l logfile start
Initialize postgresql
sudo service postgresql initdb
Start postgresql service
sudo service postgresql start
Let us login into the psql to execute our DDL statements.
sudo -u postgres psql
Lets execute the following:
CREATE DATABASE airflow;
Modify pg_hba.conf configuration file
We also need to reconfigure pg_hba.conf to allow connection from airflow.
nano /var/lib/pgsql9/data/pg_hba.conf
Here we have modified IPV4 local connection setting to:
host all all 0.0.0.0/0 trust
Save the file and lets modify postgresql.conf.
nano /var/lib/pgsql9/data/postgresql.conf
Here we will remove comments from the following lines:
listen_addresses = ‘*’port = 5432
This setting enables the service to listen to any IP address on port 5432. Host and port for this postgres server will then be used by Airflow to store its metadata.
Let’s Restart the service so that changes can take effect.
sudo service postgresql restart
We should pass along the connection info of the postgresql database to our Airflow Server that we have running.
Head over to our Airflow Config file named airflow.cfg:
- Modify sql_alchemy_conn configuration:
sql_alchemy_conn = postgresql+psycopg2://postgres@localhost:5432/airflow
- Modify executor configuration:
executor = LocalExecutor
Explanation:
- Connection String provided to sql_alchemy_conn allows Airflow to communicate with postgresql Service using postgres username.
- executor configuration when set to LocalExecutor will spawn number of processes that is equal to the value of parallelism set in airflow.conf file.
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
Lets register these changes by running:
airflow initdb
If you see this type of a screen then you are good!
Note: LocalExecutor is suitable for testing purposes only. CeleryExecutor is a more preferred option for production workloads.
We can now test this by a script that I have created.
Start the Airflow server:
airflow webserver
airflow scheduler
Once up, let us locate our DAG and trigger it.
The Graph View of the DAG will shows three tasks that will be triggered in parallel after the hello_task. True test of parallelism is when all these tasks will be triggered and completed simultaneously.
Once hello_task is completed all three Hive tasks are attempted at the same time as demonstrated by the light green box on each of these tasks.
Finally!! All three Hive tasks have been completed successfully and at the same time which means that our configuration is spot on!
SequentialExecutor in this case; would have executed these tasks one after the other irrespective of the task flow. So, modifying the executor to Local or Celery is essential for this configuration to work!