Seamless Data Flow: Fetching from AWS RDS to S3 with Apache Airflow

Seamless Data Flow: Fetching from AWS RDS to S3 with Apache Airflow

This blog aims to demonstrate the process of fetching data from your Amazon RDS MySQL Database and storing it in an S3 bucket.

Setting up the Database

Let's get started by creating an RDS instance using the AWS Management Console. To stay within the free tier, you might use the following settings. Select "MySQL" as the Engine Option and ensure you are using the latest version of the Engine.

You need to select the Template as "Free Tier" and all the other things can be left as default. Also, remember to provide a password for your Self-Managed Database and enable "Public Access" under Connectivity.

Lastly, click on Additional Configuration, provide an initial database name, and hit Create. It will take some time for the database to be up and running.

Next, we need to download Sqlectron to connect to our database. Sqlectron is a free, open-source database management application that supports MySQL, MariaDB, PostgreSQL, and various other relational databases.

Once the download is complete, provide the details of your database in Sqlectron, save the configuration, and connect to your database.

We need to use a query to create a table and enter some data into the MySQL database. To create a table, you can use the following query:

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
);
INSERT INTO `users` (`id`, `name`, `email`)
 VALUES (?, ?, ?);

Creating the S3 Bucket

To create an S3 bucket, go to the AWS Management Console and search for S3. Give a globally unique name for your S3 bucket and create the bucket.

Writing the DAG and setting up Airflow

Now, finally, we need to connect all the components so that Airflow can orchestrate the pipelines with ease. To do so, we need to set up the connections in Airflow UI but if you haven't set up Airflow earlier, you can check out my blog on getting started with Airflow.

Configure Connections in Airflow

  1. MySQL Connection:

    • Go to the Airflow UI (http://localhost:8080).

    • Navigate to Admin > Connections.

    • Add a new connection:

      • Conn Id: my-sql-default

      • Conn Type: MySQL

      • Host: Your MySQL host

      • Schema: Your MySQL database name

      • Login: Your MySQL username

      • Password: Your MySQL password

      • Port: 3306 (default MySQL port)

  2. AWS Connection:

    • Similarly, add a new connection for AWS if it doesn't already exist:

      • Conn Id: aws_default

      • Conn Type: Amazon Web Services

      • Login: Your AWS Access Key ID

      • Password: Your AWS Secret Access Key

  3. Create a DAG file:

    You can use this DAG file as an example:

     from airflow import DAG
     from airflow.providers.amazon.aws.hooks.s3 import S3Hook
     from airflow.providers.mysql.hooks.mysql import MySqlHook
     from airflow.operators.python_operator import PythonOperator
     from airflow.utils.dates import days_ago
     import pandas as pd
     import io
    
     default_args = {
         'owner': 'airflow',
         'depends_on_past': False,
         'email_on_failure': False,
         'email_on_retry': False,
         'retries': 1,
     }
    
     def fetch_data_from_mysql(**context):
         # Create a MySQL hook
         mysql_hook = MySqlHook(mysql_conn_id='my-sql-default')
    
         # Define the SQL query
         sql = "SELECT * FROM table_name"
    
         # Fetch data from MySQL
         df = mysql_hook.get_pandas_df(sql)
    
         # Push the DataFrame to XCom
         context['task_instance'].xcom_push(key='data_frame', value=df.to_dict())
    
     def upload_to_s3(**context):
         # Pull the DataFrame from XCom
         df_dict = context['task_instance'].xcom_pull(task_ids='fetch_data_from_mysql', key='data_frame')
         df = pd.DataFrame.from_dict(df_dict)
    
         s3_hook = S3Hook(aws_conn_id='aws_default')
         bucket_name = 'your-bucket-name'
         key = 'your-file-name.csv'
    
         # Convert DataFrame to CSV
         csv_buffer = io.StringIO()
         df.to_csv(csv_buffer, index=False)
         csv_buffer.seek(0)
    
         # Upload CSV to S3
         s3_hook.load_string(
             csv_buffer.getvalue(),
             key,
             bucket_name,
             replace=True
         )
    
     with DAG(
         'mysql_to_s3_dag',
         default_args=default_args,
         description='A simple DAG to fetch data from MySQL and store in S3',
         schedule_interval='@daily',
         start_date=days_ago(1),
         catchup=False,
     ) as dag:
    
         fetch_data = PythonOperator(
             task_id='fetch_data_from_mysql',
             python_callable=fetch_data_from_mysql,
             provide_context=True
         )
    
         upload_data = PythonOperator(
             task_id='upload_to_s3',
             python_callable=upload_to_s3,
             provide_context=True
         )
    
         fetch_data >> upload_data
    

This Airflow DAG (Directed Acyclic Graph) automates a workflow that involves fetching data from a MySQL database and uploading it to an S3 bucket on AWS. Here's a breakdown of what this DAG does:

  1. DAG Definition and Default Arguments:

    • The DAG is named 'mysql_to_s3_dag'.

    • It uses some default arguments like the owner is 'airflow', no email notifications on failure or retry, and it retries once on failure.

    • The DAG runs on a daily schedule and starts from one day ago (days_ago(1)).

  2. Task 1 -fetch_data_from_mysql:

    • This task uses a PythonOperator to execute the fetch_data_from_mysql function.

    • Inside this function, a MySqlHook is used to connect to a MySQL database (using the connection ID 'my-sql-default').

    • It runs an SQL query to select all data from the table.

    • The resulting data is converted into a Pandas DataFrame.

    • This DataFrame is then pushed to XCom (Airflow's cross-communication system) to be used for other tasks.

  3. Task 2 -upload_to_s3:

    • This task uses another PythonOperator to execute the upload_to_s3 function.

    • It retrieves the DataFrame from XCom (produced by the previous task).

    • The DataFrame is then converted to a CSV format using an in-memory buffer (io.StringIO).

    • An S3Hook is used to connect to an S3 bucket (using the connection ID 'aws_default').

    • The CSV data is uploaded to the specified S3 bucket with the key.

  4. Task Dependencies:

    • The upload_to_s3 task depends on the successful completion of the fetch_data_from_mysql task, ensuring that data fetching happens before uploading.

In summary, this DAG performs the following:

  • It extracts data from a MySQL table.

  • It transforms the data into a CSV file format.

  • It loads the CSV file into an S3 bucket on AWS.

If you look at the time of the last scheduling of the DAG and upload of test.csv. So, I hope I could help you in successfully deploying a DAG in Airflow