ARTICLE AD BOX
I have a common pattern in my workflows where I have a 'primary' dataframe which I may need to subset to a portion of rows, update values and potentially add new columns, and then merge those subset rows back into the primary dataframe.
I have a function that takes the 'primary' dataframe: df1 and the subset dataframe: df2 and returns the merged result by counting occurrences of the ID column and tracking the input dataframes. There may be millions of unique IDs and df2 will never introduce new IDs.
The function can stay entirely lazy from input to output, which is desirable for the complex workflows I have. However, the function is slow due to the pl.len().over(id) on the stacked dataframe prior to filtering, which I have confirmed with some profiling.
# function that updates existing rows in df1 given updated rows in df2 def row_updater(df1, df2, id=pl.col("ID"), return_sorted=False, lazy=False): # add source dataframe column df1 = df1.lazy() df2 = df2.lazy().with_columns(df_id = True) # concatenate dataframes df_result_tmp = (pl.concat([df1, df2], how="diagonal_relaxed", parallel=True).with_columns( # count number of occurances per ID id_count = pl.len().over(id) ) # filter to unupdated rows from df1 and updated rows from df2 .filter((pl.col("id_count") == 1) | ((pl.col("id_count") == 2) & (pl.col("df_id"))) ).drop("id_count", "df_id") ) # sort output by ID if requested if return_sorted: df_result_tmp = df_result_tmp.sort(id) # return a lazyframe if requested if lazy: return df_result_tmp else: return df_result_tmp.collect().rechunk()I have tried using an anti-merge to identify unique rows prior to concatenation, which is much faster. Except in the case where every row in df1 is being updated by df2, as Polars throws a panic exception when an empty frame is passed to pl.concat(). This could be avoided by collecting the frame prior to concatenation and checking its row count, but that removes the benefits of keeping the function entirely lazy. Similar workflows with df.update() also fail as it calls pl.concat() internally.
# function that updates existing rows in df1 given updated rows in df2 def row_updater(df1, df2, id=pl.col("InstID"), return_sorted=False, lazy=False): # add source dataframe column df1 = df1.lazy() df2 = df2.lazy(). # anti-join: keep rows in df1 whose id is NOT in df2 # df1_remaining is potentially empty df1_remaining = df1.join(df2.select(pl.col(id)), on=id_col, how="anti") # concatenate dataframes # pl.concat errors when df1_remaining is empty df_result_tmp = (pl.concat([df1_remaining, df2], how="diagonal_relaxed", parallel=True)) # sort output by ID if requested if return_sorted: df_result_tmp = df_result_tmp.sort(id) # return a lazyframe if requested if lazy: return df_result_tmp else: return df_result_tmp.collect().rechunk()