Welcome, Aspiring Data Engineers, to the second installment of our Data Engineering Essentials series!
Our second post, Unlocking Data Engineering: Simplifying the Journey for Beginners, guided many through the process of reading from APIs and writing into Snowflake. It was like a walk in the park for many beginners, and writing into Snowflake felt like a breeze. If you haven't read it yet, I highly recommend doing so for some quick gratification!
Now, in our third newsletter post, "Simple Steps Towards Data Processing," I've ensured that the code remains straightforward, making it easy for you to understand and execute successfully.
If you ever find yourself stuck at any point, don't hesitate to reach out to me on LinkedIn at kirankbs!
Let's embark on this beginner-friendly journey together, understand the process, and achieve another swift victory!
Agenda: Basic Data Processing
Data Cleaning: Convert Dates & Non numerical values
Feature Engineering: Create new features by deriving insights from existing columns
Data Exploration: Show Summary Statistics & Outliers
For Prerequisites & Setting Up Python Project, please refer to the previous article
Please note that this example utilizes MacOS and Python version 3.8.8. However, users on Linux and Windows systems should encounter no issues!
Data Cleaning
When extracting data from source systems, never assume that the quality of the data is excellent and in the expected formats. As a Data Engineer, you must ensure that it's converted into the correct type for further processing and to clean any unwanted or corrupted data. Make sure the data is read with the proper schema.
# Define the Schema
schema = {
"vendorid": "str",
"lpep_pickup_datetime": "str",
"lpep_dropoff_datetime": "str",
"store_and_fwd_flag": "str",
"ratecodeid": "str",
"pickup_longitude": "float",
"pickup_latitude": "float",
"dropoff_longitude": "float",
"dropoff_latitude": "float",
"passenger_count": "int",
"trip_distance": "float",
"fare_amount": "float",
"extra": "float",
"mta_tax": "float",
"tip_amount": "float",
"tolls_amount": "float",
"imp_surcharge": "float",
"total_amount": "float",
"payment_type": "str",
"trip_type": "str"
}
# Fetch data from API
api_url = "https://data.cityofnewyork.us/resource/hvrh-b6nb.json"
response = requests.get(api_url)
data = response.json()
# Convert data to Pandas DataFrame
df = pd.DataFrame(data, columns=schema.keys())
Convert the date from a string type to a DateTime type for further processing. By converting the date from a string to a DateTime type, you gain access to a variety of helper methods in Pandas that can be utilized.
# Data cleaning
# Example: Convert data types
df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
df['lpep_dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
# Remove non-numeric values from 'trip_distance' to get insights on numbers
df['trip_distance'] = pd.to_numeric(df['trip_distance'], errors='coerce') # Convert non-numeric values to NaN
Well done! The data is now clean and prepared for further processing.
Feature Engineering
Generating new features from existing columns offers users deeper insights and makes visualizing aggregated information much clearer.
Let's derive additional insights from our existing columns. For instance, we can calculate metrics such as trip duration, pickup hour, pickup day of the week, and speed.
df['trip_duration'] = (df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']).dt.total_seconds() / 60
df['pickup_hour'] = df['lpep_pickup_datetime'].dt.hour
df['pickup_dayofweek'] = df['lpep_pickup_datetime'].dt.dayofweek
df['speed'] = df['trip_distance'] / df['trip_duration']
Data Exploration
We've finished cleaning the data and adding new features. Now, let's take the final step and explore the data! The top 100 records from the dataset are displayed in the console.
View Data
Color.printGreen("Check the first few rows of the DataFrame:")
print(df.head(100))
View Missing Values
Color.printGreen("Check for missing values:")
print(df.isnull().sum())
View Summary for Trip Duration
trip_duration_stats = df['trip_duration'].describe()
Color.printGreen("Summary Statistics for Trip Duration:")
print(trip_duration_stats)
View Outliers
df['fare_amount'] = pd.to_numeric(df['fare_amount'], errors='coerce')
outliers = df[df['fare_amount'] < 0]
Color.printGreen("Outliers in fare_amount column:")
print(outliers)
Write To File
Define the features you want to include in the new DataFrame:
selected_features = ['vendorid', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'passenger_count', 'trip_distance',
'fare_amount', 'total_amount', 'trip_duration', 'pickup_hour', 'pickup_dayofweek', 'speed']
Select the specified features from the original DataFrame
new_df = df[selected_features].copy()
Write the new DataFrame to a CSV file
new_df.to_csv('trips_insights.csv', index=False)
Congratulations, If you have come across this far!
This is it! It is this simple to fetch data from API and process it. You don’t need Spark, Flink, or Notebook .. to write simple code.
Note: This application is far fetch from completion but let’s take basic steps and improve this application further!
What Is Next
In the upcoming newsletter, I'll delve into captivating visualizations and dashboards without using PowerBI or Tableau.
Please subscribe for more such content and share for the far reach!