This post explores setting up PostgreSQL and working with NYC Taxi trip data. We’ll cover environment setup, data ingestion, and seamless management using Docker. We’ll work with NYC Taxi trip record data, which is available here
1. Setting Up the Environment
To start off, I set up a virtual environment named de-zoomcamp
using Anaconda:
conda create -n de-zoomcamp python=3.9.1
conda activate de-zoomcamp
conda install -c conda-forge pandas jupyter sqlalchemy pyarrow psycopg2 pgcli
With the environment ready, let’s move on to PostgreSQL
2. Running PostgreSQL on Docker
We’ll use Docker to run PostgreSQL, an open-source relational database management system. Here’s the command:
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v /path/to/postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
postgres:13
Here, the -e
, -v
, and -p
options play specific roles:
-e
(Environment Variables):-e POSTGRES_USER="root"
and-e POSTGRES_PASSWORD="root"
set the administrative username and password to “root.”-e POSTGRES_DB="ny_taxi"
specifies the default database name as “ny_taxi.”
-v <host_path>:<container_path>
(Volume Mounting): This option mounts directories or files from the host machine to the container, ensuring that database files persist even after the container is restarted. Remember, always use an absolute path for the host directory to avoid path resolution issues.-p <host_port>:<container_port>
(Port Binding): This maps a port on the host to a port inside the container. In this case,-p 5432:5432
allows the PostgreSQL service in the container to be accessed through port 5432 on the host.
For more details on Postgres Docker container, check here
With PostgreSQL running, let’s connect to it using the pgcli
package. Run the following command in a new terminal session after activating the virtual environment:
pgcli -h localhost -p 5432 -u root -d ny_taxi
You’ll be prompted to enter a password; use “root” as specified earlier. Once connected, run \dt
to list tables. At this stage, no tables exist, so let’s proceed to add the NYC Taxi data to our database.
3. Adding NYC Taxi Data to the Postgres Database
Downloading ride data
First, I will download the NYC Taxi data set from the NYC website. The dataset is available since 2009 and is now provided in the efficient .parquet
format (not .csv
format used in the lecture videos) .parquet
is a columnar storage format that offers significant benefits, such as compressed data for reduced storage requirements. So, I made slight adjustments to the original code to accommodate this format.
The dataset is divided into four categories based on different types of rides. As a former New Yorker, I will give a brief overview of each:
- Yellow Taxi: The iconic yellow cabs you’ve likely seen in movies and TV shows.
- Green Taxi: Known as “boro taxi”. These serves areas Manhattan uptown (above 96th st) and outside Manhattan.
- For-hired vehicle: Includes services like limousines.
- High-volume for-hired vehicle: Primarily ride-sharing services such as Uber and Lyft..
Since the data is split by month and category, I wrote a bash
script named taxi_download.sh
to automate downloading for a selected range of years and months.
#!/bin/bash
### taxi_download.sh ###
# Define the base URL
BASE_URL="https://d37ci6vzurychx.cloudfront.net/trip-data"
# Define the output directory
OUTDIR="taxi_data"
# Specify the range of years, months, and categories
START_YEAR=2023
END_YEAR=2023
MONTHS=("01" "02" "03" "04" "05" "06" "07" "08" "09" "10" "11" "12")
CATEGORIES=("yellow" "green" "fhv" "fhvhv")
mkdir -p "$OUTDIR"
# Loop through each category, year, and month
for CATEGORY in "${CATEGORIES[@]}"; do
mkdir -p "$CATEGORY"
for YEAR in $(seq $START_YEAR $END_YEAR); do
for MONTH in "${MONTHS[@]}"; do
# Construct the file URL and output file name
FILE_URL="${BASE_URL}/${CATEGORY}_tripdata_${YEAR}-${MONTH}.parquet"
OUTPUT_FILE="${OUTDIR}/${CATEGORY}/${CATEGORY}_tripdata_${YEAR}-${MONTH}.parquet"
# Download the file
echo "Downloading: $FILE_URL"
curl -o "$OUTPUT_FILE" "$FILE_URL"
# Check if the download was successful
if [ $? -ne 0 ]; then
echo "Failed to download: $FILE_URL"
else
echo "Downloaded successfully: $OUTPUT_FILE"
fi
done
done
done
echo "Download process completed!"
For this exercise, I restricted downloads to data from 2023 to prevent request blocking. You can expand the range by modifying the START_YEAR
and END_YEAR
variables.
Before running the script, make it executable:
chmod +x taxi_download.sh
Then execute it:
./taxi_download.sh
Adding data to the database using Python
Now that we’ve downloaded the NYC Taxi data, the next step is to load it into our PostgreSQL database using Python
First, let’s import necessary packages
# For data manipulation
import pandas as pd
# For handling .parquet file
import pyarrow.parquet as pq
# For connecting to the PostgreSQL database
from sqlalchemy import create_engine
# For measuring runtime (optional, but handy!)
from time import time
Next, we’ll set up a connection to our PostgreSQL database:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()
This line creates a connection engine, allowing Python to communicate with the database using SQLAlchemy.
Next, We’ll read the .parquet
file for the January 2023 data for yellow taxi rides and convert it into a Pandas DataFrame:
ride_pq = pq.read_table('taxi_data/yellow/yellow_tripdata_2023-01.parquet')
ride = ride_pq.to_pandas()
Then, we will check data types. To do so, we will see the schema of our DataFrame in SQL terms, we use the following command:
print(pd.io.sql.get_schema(ride, 'yellow_taxi', con=engine))
The result is the SQL CREATE TABLE
statement, which defines how the data will look in our database:
CREATE TABLE yellow_taxi (
"VendorID" BIGINT,
tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE,
tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE,
passenger_count FLOAT(53),
trip_distance FLOAT(53),
"RatecodeID" FLOAT(53),
store_and_fwd_flag TEXT,
"PULocationID" BIGINT,
"DOLocationID" BIGINT,
payment_type BIGINT,
fare_amount FLOAT(53),
extra FLOAT(53),
mta_tax FLOAT(53),
tip_amount FLOAT(53),
tolls_amount FLOAT(53),
improvement_surcharge FLOAT(53),
total_amount FLOAT(53),
congestion_surcharge FLOAT(53),
airport_fee FLOAT(53)
)
Unlike the lecture video, the type of tpep_pickup_datetime
and tpep_dropoff_datetime
is the correct TIMESTAMP WITHOUT TIME ZONE
, not TEXT
. So we will go ahead with adding the data to the database.
Since the dataset is large, we’ll use pyarrow
’s iter_batches()
to stream the data into our database in chunks. Since this method is available only for pyarrow.parquet.ParquetFile
class, we will read .parquet
data to ParquetFile
type:
ride_parquet = pq.ParquetFile('../week1/docker_sql/taxi_data/yellow/yellow_tripdata_2023-01.parquet')
Then, we read streaming batches from a Parquet file and add to the database by batch. We will add the data to yellow_taxi_data
table in the database.
iteration_count = 0
for batch in taxi_pq.iter_batches(batch_size=100000):
t_start = time()
batch_df = batch.to_pandas()
# For the first batch
if iteration_count == 0:
batch_df.to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')
# For the following batches
else:
batch_df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
t_end = time()
iteration_count += 1
print("inserted another chunk, took %.3f second:" % (t_end - t_start))
Once the data is loaded, let’s confirm how many rows were inserted into the database:
query = """
SELECT COUNT(1) FROM yellow_taxi_data
"""
pd.read_sql(query, con=engine)
For the January 2023 dataset, we get 3,066,766 rows! That’s a lot of taxi rides, and it’s all ready for analysis.
4. Accessing the Database Using pgadmin
Navigating databases can be much easier with a GUI tool like pgAdmin
. Here’s how you can set it up using Docker.
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
dpage/pgadmin4
Once the container is up, open your browser and navigate to http://localhost:8080. Log in using the email and password defined above.
To connect to PostgreSQL in pgAdmin
:
- click “Add New Server”
- Enter a name for your server
- In the “Connection tab”, input:
- Host name/address:
localhost
- Port:
5432
- Username:
root
- Password:
root
- Host name/address:
However, you will encounter an “Unable to connect” error. This happens because the pgAdmin
and PostgreSQL containers are running separately and can’t communicate.
To address this, we will create the docker network called pg-network
.
docker network create pg-network
We will run postgres
container in the pg-network
under the name pg-database
:
docker run -dit \ # I added -d option to get the terminal back
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v /path/to/postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
--network=pg-network \
--name=pg-database \
postgres:13
And we run pgadmin
container in same network under the name pgadmin
:
docker run -dit \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
--network=pg-network \
--name=pgadmin \
dpage/pgadmin4
When connecting via pgAdmin
, replace localhost
with pg-database
as the Host name/address. Voilà! Everything should work perfectly now.
5. Dockerizing the Data Ingestion Step
Wouldn’t it be great to run your data ingestion process also within Docker? Of course you can do that!
To do so, we will first make a python script file (ingest_data.py
) ingesting data to the postgres
database.
### ingest_data.py ###
from sqlalchemy import create_engine
from time import time
import pandas as pd
import pyarrow.parquet as pq
import argparse
def main(params):
# Define parameters
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
parquet_file = params.parquet_file
# Connect to the database
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
engine.connect()
# Read the parquet file
ride_parquet = pq.ParquetFile(parquet_file)
# Add data to the database by batch
iteration_count = 0
for batch in ride_parquet.iter_batches(batch_size=100000):
t_start = time()
batch_df = batch.to_pandas()
# For the first batch
if iteration_count == 0:
batch_df.to_sql(name=table_name, con=engine, index=False, if_exists='replace')
# For the following batches
else:
batch_df.to_sql(name=table_name, con=engine, index=False, if_exists='append')
t_end = time()
iteration_count += 1
print("inserted another chunk, took %.3f second:" % (t_end - t_start))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Ingest CSV Data to Postgres')
parser.add_argument('--user', help='username for postgres')
parser.add_argument('--password', help=' password for postgres')
parser.add_argument('--host', help='host for postgres')
parser.add_argument('--port', help='port for postgres')
parser.add_argument('--db', help='database name for postgres')
parser.add_argument('--table_name', help='name of the table where the results will be written')
parser.add_argument('--parquet_file', help='location of the parquet file')
args = parser.parse_args()
main(args)
Next, use this Dockerfile
to containerize the ingestion script:
FROM python:3.9.1
RUN pip install pandas sqlalchemy pyarrow psycopg2
WORKDIR /app
VOLUME /path/to/taxi_data /data
COPY ingest_data.py ingest_data.py
ENTRYPOINT [ "python" , "ingest_data.py"]
Build the Docker container:
docker build -t taxi_ingest:v001 .
Then we run the container:
docker run -it \
--network=pg-network \
taxi_ingest:v001 \
--user=root \
--password=root \
--host=localhost \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_data \
--parquet_file='/data/yellow/yellow_tripdata_2023-01.parquet'
And just like that, your data ingestion process is Dockerized!
6. Simplifying Everything with Docker Compose
Running multiple Docker containers can be tedious. We can make everything seamless using docker-compose
. It allows us to configure multiple containers in one file.
What we need to do is to simply write the docker-compose.yaml
file.
services:
pg-database:
image: postgres:13
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
# We can use the relative path in docker-compose.yaml
- "./postgres_data:/var/lib/postgresql/data:rw"
ports:
- "5432:5432"
pg-admin:
image: dpage/pgadmin4
environment:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- "8080:80"
We do not need to do any network setup because all the containers in the docker-compose file will run under the same network.
Start everything with one command:
docker compose up
Stop all containers with:
docker compose down
How cool is that?