PostGIS Multiprocessing Code-Review

As promised, let’s walk through the code I wrote to create the multiprocessing of my Postgres/PostGIS task.  You may recall that I said that it only took 4 lines of code – well, sort of.  The parallel processing was really that easy.  So, here is the full code

import psycopg2, multiprocessing, time
from multiprocessing import pool

def pg_run(thetable):
    conn = psycopg2.connect(dbname = 'bigdata',host='127.0.0.1',user='postgres',password='postgres', port=5432)
    conn.autocommit = True
    cur = conn.cursor()
    thesql = """INSERT INTO sumtable (geom, sumfare, zone)
SELECT taxi_zones.geom, sumfare, a.zone
FROM taxi_zones, 
(SELECT taxi_zones.zone, count(taxipickup.geom) AS sumfare
FROM taxi_zones
JOIN """ + thetable + """ AS taxipickup
ON ST_Contains(taxi_zones.geom, taxipickup.geom) 
GROUP BY taxi_zones.zone) AS a
WHERE taxi_zones.zone = a.zone;
"""
    cur.execute(thesql)

if __name__ == '__main__':
    t1 = time.time()
    p = multiprocessing.Pool(3)
    thetable1 = 'yellow_tripdata_2012_08'
    thetable2 = 'yellow_tripdata_2012_09'
    thetable3 = 'yellow_tripdata_2012_10'
    thetable4 = 'yellow_tripdata_2012_11'

    myresult = p.map(pg_run,[thetable2, thetable3, thetable4])

    thesql = """SELECT sum(sumfare) as numrides, zone
INTO sumtable2
FROM sumtable
GROUP BY zone"""
    conn = psycopg2.connect(dbname = 'bigdata',host='127.0.0.1',user='postgres',password='postgres', port=5432)
    conn.autocommit = True
    cur = conn.cursor()
    cur.execute("DROP TABLE IF EXISTS sumtable2")
    cur.execute(thesql)
    print (time.time() - t1)


Wait!  You said it was only 4 lines.  Now, the reality is, a lot of that code is “filler”.  Here is everything necessary for the parallel processing:

## We first have to import the multiprocessing object.
import psycopg2, multiprocessing, time

## I'm now grabbing the pool object from multiprocessing.  This
## allows me to run separate threads:
from multiprocessing import pool
.
.
## after a bunch of python code, I'm telling the multiprocessor
## object to create 3 separate pools (or in our case, processes) 
## that will run in 3 separate CPUs.

p = multiprocessing.Pool(3)
.
.
## Now, we run the multiprocessing.  Since I set up 3 pools 
## above, I pass the function I wrote 3 different tables for 
## the query:

p.map(pg_run,[thetable2, thetable3, thetable4])

Yep, that’s it (lines 2, 6, 13, and 20).  Now, there is other code for sure, and I’ll describe some of it.  Using psycopg2 (which allows us to connect to Postgres, I create a function to run the SQL (I put comments in the code so you can see my logic):

## Here is the function, I call it pg_run, and it takes a table name (i.e. taxi_2012_10).  

def pg_run(thetable):
## in the function, we set up a separate connection to Postgres 
## using psycopg2.  
    conn = psycopg2.connect(dbname = 'bigdata',host='127.0.0.1',user='postgres',password='postgres', port=5432)
    conn.autocommit = True
    cur = conn.cursor()

## This is the SQL code we showed in my previous blog.  The only 
## difference is that I am passing a variable called thetable (that is 
## the table name).  So, this inserts the grouped results into a table 
## called sumtable.  

    thesql = """INSERT INTO sumtable (geom, sumfare, zone)
SELECT taxi_zones.geom, sumfare, a.zone
FROM taxi_zones, 
(SELECT taxi_zones.zone, count(taxipickup.geom) AS sumfare
FROM taxi_zones
JOIN """ + thetable + """ AS taxipickup
ON ST_Contains(taxi_zones.geom, taxipickup.geom) 
GROUP BY taxi_zones.zone) AS a
WHERE taxi_zones.zone = a.zone;
"""
    cur.execute(thesql)

Now, if this is all there is, we could easily call the function by issuing something like this:

pg_run(thetable)

and run the SQL query in Postgres.  But remember, we want to spawn multiple processes.  So, the next chunk of code handles that.  And, this code is actually the main portion of the program:

if __name__ == '__main__':
    t1 = time.time()
    p = multiprocessing.Pool(3)
    thetable1 = 'yellow_tripdata_2012_08'
    thetable2 = 'yellow_tripdata_2012_09'
    thetable3 = 'yellow_tripdata_2012_10'
    thetable4 = 'yellow_tripdata_2012_11'

    myresult = p.map(pg_run,[thetable2, thetable3, thetable4])

As I said before, line 3 sets up 3 separate processing pools.  Lines 4-7 just define the name of a table.  Then, line 9 runs the process using the multiprocessor .map object.  The .map object requires a list of the variables that we are passing to the function.  So, in my case, I set up 3 processing pools, so I’m going to send it 3 table names.  So, the function pg_run is going to get run in 3 separate processes.   

Now, the INSERT statement of the query is actually putting the sum of the trips into a single table.  But, we are doing this 3 times (for each of the months).  So, I need another query that will sum those results and put the data back into a new table:

thesql = """SELECT sum(sumfare) as numrides, zone
INTO sumtable2
FROM sumtable
GROUP BY zone"""
    conn = psycopg2.connect(dbname = 'bigdata',host='127.0.0.1',user='postgres',password='postgres', port=5432)
    conn.autocommit = True
    cur = conn.cursor()
    cur.execute("DROP TABLE IF EXISTS sumtable2")
    cur.execute(thesql)
    print (time.time() - t1)

And of course, I’m printing out the time it took to complete.  

So, this shows that if you can write a function in Python that talks to Postgres, then with about 4 lines of code, you can turn this into a parallelized version of the function.  

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s