Concepts

  • Batch processing collection of data from multiple sources and combined to become a single source of information.
  • a type of batch processing called Extract, Transform and Load (ETL)
  • the process of extracting large amounts of data from multiple sources and formats and transforming it into one specific format before loading it into a database or target file.

Block Diagram of ETL

Extraction

Python Composite Functions

import glob
import pandas as pd

list_csv = glob.glob("*.csv")
list_json = glob.glob("*.json")

df_csv = pd.read_csv("source.csv")
df_json = pd.read_json("source.json", lines=True)

Extract Function

def extract():
    extracted_data = pd.DataFrame(columns=['name', 'height', 'weight'])

    for csvfile in glob.glob('*.csv'):
        extracted_data = extracted_data.append(
            pd.read_csv((csvfile),
            ignore_index=True
        )

    for jsonfile in glob.glb("*.json"):
        extracted_data = extracted_data.append(
            pd.read_json(jsonfile, lines=True),
            ignore_index=True
        )

    return extracted_data

If we did not set the parameter “ignore_index” to true, then the index of the data frame “extracted_data” would be the same as the row Number of the original file.

If we “ignore index” to true, then the order of each row would be the same as the order the row was appended to the data frame.

Transform

def transform(data):
    data['height'] = round(data.height * 0.0254, 2)
    data['weight'] = round(data.weight * 0.45359237, 2)

    return data

Load

def load(targetfile, data_to_laod):
    data_to_load.to_csv(targetfile)
    targetfile = 'transformed_data.csv'
    load(targetfile, transformed_data)

Logging Entries

from datatime from datetime

def log(message):
    timestamp_format = "%Y-%h-%d-%H:%M:%S"
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)

    with open("logfile.txt", "a") as f:
        f.write (timestamp + ',' + message + '\n')

Hands-on Project

The complex Extract, Transform, and Loading operations involves following steps:

  • Extract relevant information from websites using Webscraping and requests API.
  • Transform the data to a required format.
  • Load the processed data to a local file or as a database table.
  • Query the database table using Python.
  • Create detailed logs of all operations conducted.

Solutions

Top Countries GDP - ETL Process

"""Module: Code for ETL operations on Country-GDP data"""

import os
import sqlite3
import requests
import numpy as np
import pandas as pd

from bs4 import BeautifulSoup
from datetime import datetime

os.system("clear")

WIKIPEDIA_GDP_URL = (
    "https://web.archive.org/web/20230902185326/https://en.wikipedia.org/wiki/List_of_countries_by_GDP_%28nominal%29"
)
TABLE_ATTRIBUTES = ["Country", "GDP_USD_millions"]
DB_NAME = "world_economies.db"
TABLE_NAME = "countries_by_gdp"
CSV_PATH = "countries_by_gdp.csv"


def extract(url, table_attributes):
    """
    This function extracts the required
    information from the website and saves it to a dataframe. The
    function returns the dataframe for further processing.
    """
    page_output = requests.get(url, timeout=10)
    html_soup = BeautifulSoup(page_output.text, "html.parser")
    gdp_df = pd.DataFrame(columns=table_attributes)

    tables = html_soup.find_all("tbody")
    rows = tables[2].find_all("tr")

    for row in rows:
        data = row.find_all("td")

        if len(data) == 0:
            continue

        country = data[0].find("a")
        gdp = data[2]

        if country is None or "—" in gdp:
            continue

        row_dict = {"Country": country.text, "GDP_USD_millions": gdp.text}
        row_df = pd.DataFrame(row_dict, index=[0])
        gdp_df = pd.concat([gdp_df, row_df], ignore_index=[0])

    return gdp_df


def convert_currency_to_numeric(currency):
    """
    This function converts the currency text into numeric form.
    """
    return float("".join(currency.split(",")))


def convert_millions_to_billions(millions):
    """
    This function convert millions value to billions and round
    to 2 decimal places.
    """
    return np.round(millions / 1000, 2)


def transform(df):
    """
    This function converts the GDP information from Currency
    format to float value, transforms the information of GDP from
    USD (Millions) to USD (Billions) rounding to 2 decimal places.
    The function returns the transformed dataframe.
    """
    gdp_list = df["GDP_USD_millions"].tolist()
    gdp_list = [convert_millions_to_billions(convert_currency_to_numeric(gdp_value)) for gdp_value in gdp_list]
    df["GDP_USD_millions"] = gdp_list
    df = df.rename(columns={"GDP_USD_millions": "GDP_USD_billions"})

    return df


def load_to_csv(df, csv_path):
    """
    This function saves the final dataframe as a `CSV` file
    in the provided path. Function returns nothing.
    """

    df.to_csv(csv_path)


def load_to_db(df, sql_connection, table_name):
    """
    This function saves the final dataframe as a database table
    with the provided name. Function returns nothing.
    """
    df.to_sql(table_name, sql_connection, if_exists="replace", index=False)


def run_query(statement, sql_connection):
    """
    This function runs the stated query on the database table and
    prints the output on the terminal. Function returns nothing.
    """
    print("Query Statement:", statement)
    output = pd.read_sql(statement, sql_connection)
    print("Query Output:", output)


def log_progress(message):
    """
    This function logs the mentioned message at a given
    stage of the code execution to a log file.
    Function returns nothing
    """
    timestamp_format = "%Y-%h-%d-%H:%M:%S"
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)

    with open("./code_log.txt", "a") as f:
        f.write(timestamp + " : " + message + "\n")


if __name__ == "__main__":
    log_progress("Preliminaries complete. Initiating ETL process.")

    extracted_data = extract(WIKIPEDIA_GDP_URL, TABLE_ATTRIBUTES)
    log_progress("Data Extraction complete. Initiating transformation process.")

    transform_data = transform(extracted_data)
    log_progress("Data Transformation complete. Initiating loading process.")

    load_to_csv(transform_data, "gdp.csv")
    log_progress("Data load to CSV file complete. Initiating SQL connection.")

    sql_connection = sqlite3.connect(DB_NAME)
    log_progress("SQL Connection Initiated. Initiating loading process.")

    load_to_db(transform_data, sql_connection, TABLE_NAME)
    log_progress("Data load to database complete. Running the query")

    query_statement = f"SELECT * from {TABLE_NAME} WHERE GDP_USD_billions >= 100"
    run_query(query_statement, sql_connection)
    log_progress("Run query complete. Closing SQL connection.")

    sql_connection.close()
    log_progress('Process Complete. Connection Closed.')

World top Banks - ETL Process

"""Module: Code for ETL operations on Bank data"""

import os
import sqlite3
import requests
import numpy as np
import pandas as pd

from bs4 import BeautifulSoup
from datetime import datetime

os.system("clear")

WIKIPEDIA_BANK_URL = "https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks"
TABLE_ATTRIBUTES = ["Bank_Name", "USD_Billion"]
DB_NAME = "banks.db"
TABLE_NAME = "largest_banks"
CSV_PATH = "banks.csv"


def extract(url, table_attributes):
    """
    This function extracts the required
    information from the website and saves it to a dataframe. The
    function returns the dataframe for further processing.
    """
    page_output = requests.get(url, timeout=10)
    html_soup = BeautifulSoup(page_output.text, "html.parser")
    bank_df = pd.DataFrame(columns=table_attributes)

    tables = html_soup.find_all("tbody")
    rows = tables[0].find_all("tr")

    for row in rows:
        data = row.find_all("td")

        if not data:
            continue

        bank = data[1].find_all("a")
        billion = data[2]

        row_dict = {"Bank_Name": bank[1].text, "USD_Billion": billion.text}
        row_df = pd.DataFrame(row_dict, index=[0])
        bank_df = pd.concat([bank_df, row_df], ignore_index=[0])

    return bank_df


def convert_currency_to_numeric(currency):
    """
    This function converts the currency text into numeric form.
    """
    return float("".join(currency.split(",")))


def transform(df):
    """
    This function accesses the CSV file for exchange rate
    information, and adds three columns to the data frame, each
    containing the transformed version of Market Cap column to
    respective currencies
    """
    currency_df = pd.read_csv("exchange_rate.csv")
    exchange_rate = currency_df.set_index("Currency").to_dict()["Value"]
    df["USD_Billion"] = [convert_currency_to_numeric(x) for x in df["USD_Billion"]]
    df["GBP_Billion"] = [np.round(x * exchange_rate["GBP"], 2) for x in df["USD_Billion"]]
    df["EUR_Billion"] = [np.round(x * exchange_rate["EUR"], 2) for x in df["USD_Billion"]]
    df["NPR_Billion"] = [np.round(x * exchange_rate["NPR"], 2) for x in df["USD_Billion"]]

    return df


def load_to_csv(df, csv_path):
    """
    This function saves the final data frame as a `CSV` file
    in the provided path. Function returns nothing.
    """

    df.to_csv(csv_path)


def load_to_db(df, sql_connection, table_name):
    """
    This function saves the final data frame as a database table
    with the provided name. Function returns nothing.
    """
    df.to_sql(table_name, sql_connection, if_exists="replace", index=False)


def run_query(statement, sql_connection):
    """
    This function runs the stated query on the database table and
    prints the output on the terminal. Function returns nothing.
    """
    print("Query Statement:", statement)
    output = pd.read_sql(statement, sql_connection)
    print("Query Output:", output)


def log_progress(message):
    """
    This function logs the mentioned message at a given
    stage of the code execution to a log file.
    Function returns nothing
    """
    timestamp_format = "%Y-%h-%d-%H:%M:%S"
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)

    with open("./code_log.txt", "a") as f:
        f.write(timestamp + " : " + message + "\n")


if __name__ == "__main__":
    log_progress("Preliminaries complete. Initiating ETL process.")

    extracted_data = extract(WIKIPEDIA_BANK_URL, TABLE_ATTRIBUTES)
    log_progress("Data Extraction complete. Initiating transformation process.")

    transform_data = transform(extracted_data)
    log_progress("Data Transformation complete. Initiating loading process.")

    load_to_csv(transform_data, "banks.csv")
    log_progress("Data load to CSV file complete. Initiating SQL connection.")

    sql_connection = sqlite3.connect(DB_NAME)
    log_progress("SQL Connection Initiated. Initiating loading process.")

    load_to_db(transform_data, sql_connection, TABLE_NAME)
    log_progress("Data load to database complete. Running the query")

    query_statement = f"SELECT * FROM {TABLE_NAME}"
    run_query(query_statement, sql_connection)
    log_progress("Run query 1 complete. Closing SQL connection.")

    query_statement = f"SELECT AVG(NPR_Billion) FROM {TABLE_NAME}"
    run_query(query_statement, sql_connection)
    log_progress("Run query 2 complete. Closing SQL connection.")

    query_statement = f"SELECT * FROM {TABLE_NAME} LIMIT 5"
    run_query(query_statement, sql_connection)
    log_progress("Run query 3 complete. Closing SQL connection.")

    sql_connection.close()
    log_progress("Process Complete. Connection Closed.")