Ad

How To Make Parallel Processing Work In Python?

I am trying to do parallel processing in python. I have a huge dataframe with more than 4M rows. So as a sample given below, I would like to divide the dataframe(df will be divided into df1,df2) apply the same set of transpose operations on the different resultant dataframes. Thanks to Jezrael for helping me reach upto this level.Please find below my input dataframe

df = pd.DataFrame({
'subject_id':[1,1,1,1,2,2,2,2,3,3,4,4,4,4,4],
'readings' : ['READ_1','READ_2','READ_1','READ_3','READ_1','READ_5','READ_6','READ_8','READ_10','READ_12','READ_11','READ_14','READ_09','READ_08','READ_07'],
'val' :[5,6,7,11,5,7,16,12,13,56,32,13,45,43,46],
})

code to divide the dataframe

N=2  # dividing into two dataframes.
dfs = [x for _,x in df.groupby(pd.factorize(df['subject_id'])[0] // N)] # dfs is an iterable which will have two dataframes

parallel processing code

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
results = []

def transpose_ope(df):                      #this function does the transformation like I want
    df_op = (df.groupby(['subject_id','readings'])['val']
            .describe()
            .unstack()
            .swaplevel(0,1,axis=1)
            .reindex(df['readings'].unique(), axis=1, level=0))
    df_op.columns = df_op.columns.map('_'.join)
    df_op = df_op.reset_index()

results.append(pool.map(transpose_ope, [df for df in dfs])) # am I storing the output correctly here?

Actually, I would like to append the output from each stage to a main dataframe.

Can you help me do this? My code keeps running even for just some 10-15 records.

Ad

Answer

The function you use in map needs to return the object you want.

I would also use the more idiomatic context manager available for pool.

EDIT: Fixed import

import multiprocessing as mp

def transpose_ope(df):                      #this function does the transformation like I want
    df_op = (df.groupby(['subject_id','readings'])['val']
            .describe()
            .unstack()
            .swaplevel(0,1,axis=1)
            .reindex(df['readings'].unique(), axis=1, level=0))
    df_op.columns = df_op.columns.map('_'.join)
    df_op = df_op.reset_index()
    return df_op


def main():

    with mp.Pool(mp.cpu_count()) as pool:
        res = pool.map(transpose_ope, [df for df in dfs])

if __name__=='__main__':
   main()

Not sure why you're appending a single list to another list...but if you just want a final list of [transformed(df) for df in dfs], map returns just that.

Ad
source: stackoverflow.com
Ad