Skip to the content.

This post explores setting up PostgreSQL and working with NYC Taxi trip data. We’ll cover environment setup, data ingestion, and seamless management using Docker. We’ll work with NYC Taxi trip record data, which is available here

1. Setting Up the Environment

To start off, I set up a virtual environment named de-zoomcamp using Anaconda:

conda create -n de-zoomcamp python=3.9.1
conda activate de-zoomcamp
conda install -c conda-forge pandas jupyter sqlalchemy pyarrow psycopg2 pgcli

With the environment ready, let’s move on to PostgreSQL

2. Running PostgreSQL on Docker

We’ll use Docker to run PostgreSQL, an open-source relational database management system. Here’s the command:

docker run -it \
  -e POSTGRES_USER="root" \
  -e POSTGRES_PASSWORD="root" \
  -e POSTGRES_DB="ny_taxi" \
  -v /path/to/postgres_data:/var/lib/postgresql/data \
  -p 5432:5432 \
  postgres:13

Here, the -e, -v, and -p options play specific roles:

For more details on Postgres Docker container, check here

With PostgreSQL running, let’s connect to it using the pgcli package. Run the following command in a new terminal session after activating the virtual environment:

pgcli -h localhost -p 5432 -u root -d ny_taxi 

You’ll be prompted to enter a password; use “root” as specified earlier. Once connected, run \dt to list tables. At this stage, no tables exist, so let’s proceed to add the NYC Taxi data to our database.

3. Adding NYC Taxi Data to the Postgres Database

Downloading ride data

First, I will download the NYC Taxi data set from the NYC website. The dataset is available since 2009 and is now provided in the efficient .parquet format (not .csv format used in the lecture videos) .parquet is a columnar storage format that offers significant benefits, such as compressed data for reduced storage requirements. So, I made slight adjustments to the original code to accommodate this format.

The dataset is divided into four categories based on different types of rides. As a former New Yorker, I will give a brief overview of each:

  1. Yellow Taxi: The iconic yellow cabs you’ve likely seen in movies and TV shows.
  2. Green Taxi: Known as “boro taxi”. These serves areas Manhattan uptown (above 96th st) and outside Manhattan.
  3. For-hired vehicle: Includes services like limousines.
  4. High-volume for-hired vehicle: Primarily ride-sharing services such as Uber and Lyft..

Since the data is split by month and category, I wrote a bash script named taxi_download.sh to automate downloading for a selected range of years and months.

#!/bin/bash
### taxi_download.sh ###

# Define the base URL
BASE_URL="https://d37ci6vzurychx.cloudfront.net/trip-data"

# Define the output directory
OUTDIR="taxi_data"
# Specify the range of years, months, and categories
START_YEAR=2023
END_YEAR=2023
MONTHS=("01" "02" "03" "04" "05" "06" "07" "08" "09" "10" "11" "12")
CATEGORIES=("yellow" "green" "fhv" "fhvhv")

mkdir -p "$OUTDIR"

# Loop through each category, year, and month
for CATEGORY in "${CATEGORIES[@]}"; do
    mkdir -p "$CATEGORY"
    for YEAR in $(seq $START_YEAR $END_YEAR); do
        for MONTH in "${MONTHS[@]}"; do
            # Construct the file URL and output file name
            FILE_URL="${BASE_URL}/${CATEGORY}_tripdata_${YEAR}-${MONTH}.parquet"
            OUTPUT_FILE="${OUTDIR}/${CATEGORY}/${CATEGORY}_tripdata_${YEAR}-${MONTH}.parquet"

            # Download the file
            echo "Downloading: $FILE_URL"
            curl -o "$OUTPUT_FILE" "$FILE_URL"

            # Check if the download was successful
            if [ $? -ne 0 ]; then
                echo "Failed to download: $FILE_URL"
            else
                echo "Downloaded successfully: $OUTPUT_FILE"
            fi
        done
    done
done

echo "Download process completed!"

For this exercise, I restricted downloads to data from 2023 to prevent request blocking. You can expand the range by modifying the START_YEAR and END_YEAR variables.

Before running the script, make it executable:

chmod +x taxi_download.sh

Then execute it:

./taxi_download.sh

Adding data to the database using Python

Now that we’ve downloaded the NYC Taxi data, the next step is to load it into our PostgreSQL database using Python

First, let’s import necessary packages

# For data manipulation
import pandas as pd

# For handling .parquet file
import pyarrow.parquet as pq

# For connecting to the PostgreSQL database
from sqlalchemy import create_engine

# For measuring runtime (optional, but handy!)
from time import time

Next, we’ll set up a connection to our PostgreSQL database:

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

This line creates a connection engine, allowing Python to communicate with the database using SQLAlchemy.

Next, We’ll read the .parquet file for the January 2023 data for yellow taxi rides and convert it into a Pandas DataFrame:

ride_pq = pq.read_table('taxi_data/yellow/yellow_tripdata_2023-01.parquet')
ride = ride_pq.to_pandas()

Then, we will check data types. To do so, we will see the schema of our DataFrame in SQL terms, we use the following command:

print(pd.io.sql.get_schema(ride, 'yellow_taxi', con=engine))

The result is the SQL CREATE TABLE statement, which defines how the data will look in our database:

CREATE TABLE yellow_taxi (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)

Unlike the lecture video, the type of tpep_pickup_datetime and tpep_dropoff_datetime is the correct TIMESTAMP WITHOUT TIME ZONE, not TEXT. So we will go ahead with adding the data to the database.

Since the dataset is large, we’ll use pyarrow’s iter_batches() to stream the data into our database in chunks. Since this method is available only for pyarrow.parquet.ParquetFile class, we will read .parquet data to ParquetFile type:

ride_parquet = pq.ParquetFile('../week1/docker_sql/taxi_data/yellow/yellow_tripdata_2023-01.parquet')

Then, we read streaming batches from a Parquet file and add to the database by batch. We will add the data to yellow_taxi_data table in the database.

iteration_count = 0
for batch in taxi_pq.iter_batches(batch_size=100000):
    t_start = time()
    batch_df = batch.to_pandas()
    # For the first batch
    if iteration_count == 0:	
        batch_df.to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')
    # For the following batches
  	else: 
        batch_df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    t_end = time()
    iteration_count += 1
    print("inserted another chunk, took %.3f second:" % (t_end - t_start))

Once the data is loaded, let’s confirm how many rows were inserted into the database:

query = """
SELECT COUNT(1) FROM yellow_taxi_data
"""
pd.read_sql(query, con=engine)

For the January 2023 dataset, we get 3,066,766 rows! That’s a lot of taxi rides, and it’s all ready for analysis.

4. Accessing the Database Using pgadmin

Navigating databases can be much easier with a GUI tool like pgAdmin. Here’s how you can set it up using Docker.

docker run -it \
  -e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
  -e PGADMIN_DEFAULT_PASSWORD="root" \
  -p 8080:80 \
  dpage/pgadmin4

Once the container is up, open your browser and navigate to http://localhost:8080. Log in using the email and password defined above.

To connect to PostgreSQL in pgAdmin:

  1. click “Add New Server”
  2. Enter a name for your server
  3. In the “Connection tab”, input:
    • Host name/address: localhost
    • Port: 5432
    • Username: root
    • Password: root

However, you will encounter an “Unable to connect” error. This happens because the pgAdmin and PostgreSQL containers are running separately and can’t communicate.

To address this, we will create the docker network called pg-network.

docker network create pg-network

We will run postgres container in the pg-network under the name pg-database:

docker run -dit \ # I added -d option to get the terminal back 
  -e POSTGRES_USER="root" \
  -e POSTGRES_PASSWORD="root" \
  -e POSTGRES_DB="ny_taxi" \
  -v /path/to/postgres_data:/var/lib/postgresql/data \
  -p 5432:5432 \
  --network=pg-network \
  --name=pg-database \
  postgres:13

And we run pgadmin container in same network under the name pgadmin:

docker run -dit \
  -e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
  -e PGADMIN_DEFAULT_PASSWORD="root" \
  -p 8080:80 \
  --network=pg-network \
  --name=pgadmin \
  dpage/pgadmin4

When connecting via pgAdmin, replace localhost with pg-database as the Host name/address. Voilà! Everything should work perfectly now.

5. Dockerizing the Data Ingestion Step

Wouldn’t it be great to run your data ingestion process also within Docker? Of course you can do that!

To do so, we will first make a python script file (ingest_data.py) ingesting data to the postgres database.

### ingest_data.py ###

from sqlalchemy import create_engine
from time import time

import pandas as pd
import pyarrow.parquet as pq
import argparse

def main(params):
  	# Define parameters
    user = params.user
    password = params.password
    host = params.host
    port = params.port
    db = params.db
    table_name = params.table_name
    parquet_file = params.parquet_file

    # Connect to the database
    engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
    engine.connect()

    # Read the parquet file
		ride_parquet = pq.ParquetFile(parquet_file)
    
    # Add data to the database by batch 
    iteration_count = 0
    for batch in ride_parquet.iter_batches(batch_size=100000):
      	t_start = time()
        batch_df = batch.to_pandas()
        # For the first batch
        if iteration_count == 0:	
          	batch_df.to_sql(name=table_name, con=engine, index=False, if_exists='replace')
        # For the following batches
      	else: 
        		batch_df.to_sql(name=table_name, con=engine, index=False, if_exists='append')
          	t_end = time()
      	iteration_count += 1
    		print("inserted another chunk, took %.3f second:" % (t_end - t_start))
        

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Ingest CSV Data to Postgres')

    parser.add_argument('--user', help='username for postgres')
    parser.add_argument('--password', help=' password for postgres')
    parser.add_argument('--host', help='host for postgres')
    parser.add_argument('--port', help='port for postgres')
    parser.add_argument('--db', help='database name for postgres')
    parser.add_argument('--table_name', help='name of the table where the results will be written')
    parser.add_argument('--parquet_file', help='location of the parquet file')
    args = parser.parse_args()
    main(args)

Next, use this Dockerfile to containerize the ingestion script:

FROM python:3.9.1
RUN pip install pandas sqlalchemy pyarrow psycopg2 
WORKDIR /app
VOLUME /path/to/taxi_data /data
COPY ingest_data.py ingest_data.py
ENTRYPOINT [ "python" , "ingest_data.py"]

Build the Docker container:

docker build -t taxi_ingest:v001 .

Then we run the container:

docker run -it \
	--network=pg-network \
	taxi_ingest:v001 \
		--user=root \
		--password=root \
		--host=localhost \
		--port=5432 \
		--db=ny_taxi \
		--table_name=yellow_taxi_data \
		--parquet_file='/data/yellow/yellow_tripdata_2023-01.parquet'

And just like that, your data ingestion process is Dockerized!

6. Simplifying Everything with Docker Compose

Running multiple Docker containers can be tedious. We can make everything seamless using docker-compose. It allows us to configure multiple containers in one file.

What we need to do is to simply write the docker-compose.yaml file.

services:
  pg-database:
    image: postgres:13
    environment:
      - POSTGRES_USER=root 
      - POSTGRES_PASSWORD=root
      - POSTGRES_DB=ny_taxi
    volumes:
    	# We can use the relative path in docker-compose.yaml
      - "./postgres_data:/var/lib/postgresql/data:rw"
    ports:
      - "5432:5432"
  pg-admin:
    image: dpage/pgadmin4
    environment:
      - PGADMIN_DEFAULT_EMAIL=admin@admin.com
      - PGADMIN_DEFAULT_PASSWORD=root
    ports:
      - "8080:80"

We do not need to do any network setup because all the containers in the docker-compose file will run under the same network.

Start everything with one command:

docker compose up

Stop all containers with:

docker compose down

How cool is that?