Common Transforms Solved In Pyspark, SQL and Pandas
In the below videos, we look at the ROW_NUMBER()
, PARTITION BY
and DENSE_RANK()
functionality:
- SQL Basics: How To Use ROW NUMBER() and Why
- SQL Basics: How To Use PARTITION BY and Why
- SQL Basics: How To Use DENSE_RANK() and Why
Some questions that are answered in these videos:
- In the examples, we’re using new tables that have data in them to demonstrate the
ROW_NUMBER()
functionality along with using some tables that we’ve used in other examples. - When we review the examples, what happens if we have multiple columns that we order by? How would we specify that if we wanted Column1 to come before Column2?
- What can we pass into the
ROW_NUMBER()
functionality that allows us to invert the order of a data set? How could this be useful? - Think of a math question that
ROW_NUMBER()
would help us solve. Any question that involves first ordering data (ie: median, mode as simple examples) will highlight where this functionality will be useful. - In the examples, we’ll notice that we’re using a table with mixed values — some completely new in one column and another column with duplicates. These demos are live, so we are using live data.
- What is the difference between partitioning by the column
ID
and the columnLetter
? - What happens when we insert a new record that is duplicate in both
ID
,Letter
, andVal
? How does thePARTITION BY
withROW_NUMBER()
function here? - While people tend to think of
PARTITION BY
as helpful to remove duplicates, how would this functionality possibly help us with scaling horizontally and why? - You’ll notice that the table being used has merchants by names and product ids associated by the merchant. As a note, we would normally use an associative table here, but for the sake of this video, we are using this contrived example, as it helps to illustrate how we can use
DENSE_RANK()
. - In the video, both
ROW_NUMBER()
andDENSE_RANK()
are compared side-by-side. Using merchant AAA as an example, how do these two return different values? - Suppose we had 100 home sales by three states of Oklahoma, Kentucky and North Carolina — what would be the maximum
DENSE_RANK()
value if we ranked by state? What would be the maximumROW_NUMBER()
? - In considering “groups” and “ordering” what would we use for “ordering” and what would we use for “grouping” when comparing
ROW_NUMBER()
orDENSE_RANK()
?
Test Data For Our Pyspark, SQL and Pandas Code
For our next few examples, we’ll use a different data set as we’ll see the above transforms along with some data level transformations. The below code blocks relate to either Pyspark or Pandas.
Pyspark Test Data
tableFullData = sc.parallelize([
{"Alid": "A", "dtOne": "2023-01-01", "dtTwo": "1-1-2023", "dtThree": "1/1/2023", "iField": 10, "vField": "7"},
{"Alid": "B", "dtOne": "2023-02-01", "dtTwo": "2-1-2023", "dtThree": "2/1/2023", "iField": 15, "vField": "9"},
{"Alid": "B", "dtOne": "2023-02-01", "dtTwo": "2-1-2023", "dtThree": "2/1/2023", "iField": 15, "vField": "9"},
{"Alid": "C", "dtOne": "2023-03-01", "dtTwo": "3-1-2023", "dtThree": "3/1/2023", "iField": 12, "vField": "4"},
{"Alid": "D", "dtOne": "2023-01-07", "dtTwo": "2-1-2023", "dtThree": "2/5/2023", "iField": 101, "vField": "10"},
{"Alid": "E", "dtOne": "2023-04-01", "dtTwo": "4-1-2023", "dtThree": "4/1/2023", "iField": 22, "vField": "A"}
])
tableFullDataDf = spark.createDataFrame(tableFullDataDf)
tableFullDataDf.createOrReplaceTempView("tableFullDataDf") ### For Spark SQL
Pandas Test Data
tableFull = {
"Alid": ["A","B","B","C","D","E"],
"dtOne": ["2023-01-01","2023-02-01","2023-02-01","2023-03-01","2023-01-07","2023-04-01"],
"dtTwo": ["1-1-2023","2-1-2023","2-1-2023","3-1-2023","2-1-2023","4-1-2023"],
"dtThree": ["1/1/2023","2/1/2023","2/1/2023","3/1/2023","2/5/2023","4/1/2023"],
"iField": [10, 15, 15, 12, 101, 22],
"vField": ["7","9","9","4","10","A"],
}
tableFullDf = pd.DataFrame(tableFull)
Remember that you can create a pandas dataframe or pyspark dataframe from the inverse (ie: pandas dataframe to pyspark dataframe).
You can take these dataframes and edit them or even make your own as needed to create contrived data sets for unit testing. This really helps if you develop using test driven development. One of the advantages of test driven development that sometimes gets overlooked is the speed at which you verify that your functionality works as expected. When you deal with development environments that have 1TB of data, using a small data set to test functionality can be a huge time saver, as even the development environment has a huge size (and in most cases, a development environment should be a true development environment in matching how a production environment is).
Pyspark
Let’s take our pyspark dataframe and use the functionality of ROW_NUMBER()
, PARTITION BY
and DENSE_RANK()
along with editing some of our data values. We’ll look at each of these transformed results one-by-one:
from pyspark.sql import functions
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, date_format, expr
ordering_id = Window.orderBy("Alid")
partitioning = Window.partitionBy("Alid").orderBy("Alid")
ordering_date = Window.orderBy("dtOne")
updatedDf = tableFullDataDf \
.withColumn("RowNumberBy_Alid", functions.row_number().over(ordering_id)) \
.withColumn("PartitionBy_Alid", functions.row_number().over(partitioning)) \
.withColumn("DenseRankBy_dtOne", functions.dense_rank().over(ordering_date)) \
.select(
"Alid"
,"RowNumberBy_Alid"
,"PartitionBy_Alid"
,"dtOne"
,"DenseRankBy_dtOne"
,"dtTwo"
, date_format(to_date("dtTwo","M-d-yyyy"), "yyyy-MM-dd").alias("dtTwo_formatted")
,"dtThree"
, date_format(to_date("dtThree","M/d/yyyy"), "yyyy-MM-dd").alias("dtThree_formatted")
,"iField"
,"vField"
, expr("try_cast(vField as int)").alias("vFieldAsInteger")
)
display(updatedDf)
Each of the transformed column names state what they are — for instance, RowNumberBy_Alid
is a column with the rows ordered.
In order for us to use the same transforms that we saw in the SQL basics’ videos, we must first create the appropriate Window
functioning calls. With the ordering_id
window function, we want to see the column Alid
ordered using the row_number()
functionality. As we expect, each row now has a row number added to it. However, we see that we have duplicate Alid
values, so if we want to find the duplicates where they have a value of 2, we can partition by this same column with the same order. The partitioning
window call performs this and we see our duplicate value of B is labelled as 1 and 2 because there are 2 of them. Finally, we rank our dates using the ordering_date
window function, but this time passing in dense_rank()
.
When formatting dates, we first have to convert the string to a date. We see this with the to_date
function and notice that this function uses the string format of the dates. This must match, otherwise you’ll get nothing. From there, we then pick the format of our date to yyyy-MM-dd
. Finally, we’ll notice that we use a SQL expression with expr()
to try to cast varchars as integers and the varchar (“A”) that is not an integer returns null.
Spark SQL
Let’s take our Spark SQL view and use the functionality of ROW_NUMBER()
, PARTITION BY
and DENSE_RANK()
along with editing some of our data values. We’ll look at each of these transformed results one-by-one:
SELECT
Alid
, ROW_NUMBER() OVER (ORDER BY Alid) AS RowNumberBy_Alid
, ROW_NUMBER() OVER (PARTITION BY Alid ORDER BY Alid) AS PartitionBy_Alid
, dtOne
, DENSE_RANK() OVER (ORDER BY dtOne) AS DenseRankBy_dtOne
, dtTwo
, DATE_FORMAT(TO_DATE(dtTwo,"M-d-yyyy"), "yyyy-MM-dd") AS dtTwo_formatted
, dtThree
, DATE_FORMAT(TO_DATE(dtThree,"M/d/yyyy"), "yyyy-MM-dd") AS dtThree_formatted
, iField
, vField
, TRY_CAST(vField AS INT) AS vFieldAsInteger
FROM tableFullDataDf
We use the same naming of transformed columns that we used in the pyspark.
As we see in the above Spark SQL, our SQL matches what we see in the videos as far as how we’re ordering, partitioning and ranking data. We also see how the Spark SQL results in the same results as the pyspark as far as the values (the actual order will differ).
As for the specific data values, notice the similarity between pyspark and Spark SQL.
Pandas
Let’s take our pandas dataframe and use the functionality of ROW_NUMBER()
, PARTITION BY
and DENSE_RANK()
along with editing some of our data values. We’ll look at each of these transformed results one-by-one:
tableFullDf["RowNumberBy_Alid"] = tableFullDf.index + 1
tableFullDf["PartitionBy_Alid"] = tableFullDf.groupby("Alid")["Alid"].rank(method="first").astype(int)
tableFullDf["DenseRankBy_dtOne"] = tableFullDf["dtOne"].rank(method="dense", ascending=True).astype(int)
tableFullDf["dtTwo_formatted"] = pd.to_datetime(tableFullDf["dtTwo"]).dt.strftime('%Y-%m-%d')
tableFullDf["dtThree_formatted"] = pd.to_datetime(tableFullDf["dtThree"]).dt.strftime('%Y-%m-%d')
tableFullDf["vFieldAsInteger"] = pd.to_numeric(tableFullDf["vField"], errors="coerce")
print(tableFullDf[["Alid","RowNumberBy_Alid","PartitionBy_Alid","dtOne","DenseRankBy_dtOne","dtTwo","dtTwo_formatted","dtThree","dtThree_formatted","vField","vFieldAsInteger","iField"]])
We use the same naming of transformed columns that we used in the pyspark.
Pandas differs significantly from pyspark (and Spark SQL). Let’s look at the functionality with more depth, as if you’re coming from any SQL language this won’t look familiar.
For the column of RowNumberBy_Alid
, we’re simply using the built-in index that pandas already has one dataframes and adding 1 to each value. When you print a pandas’ dataframe, you’ll notice that there is an index starting at 0. By adding 1 to this index, we get the same result that we would get with row_number. Keep in mind that this result is correct only because of how we’ve ordered the data — for instance, if we wanted a different order, we would need to specify that.
Let’s dig into how the rank works by looking at a more granular example with a set of numbers. We’ll first create a pandas’ dataframe with a set of numbers ranging from 5 to 20 and some duplicate numerical values.
dense = {
"Numbers": [5,10,10,15,20]
}
denseDf = pd.DataFrame(dense)
Next, let’s rank this set of numbers using methods, such as first
, dense
, average
, min
and max
. An explanation of the results we see is provided below this.
denseDf["Rank_First"] = denseDf["Numbers"].rank(method="first").astype(int)
denseDf["Rank_DenseAsc"] = denseDf["Numbers"].rank(method="dense").astype(int)
denseDf["Rank_DenseDesc"] = denseDf["Numbers"].rank(method="dense", ascending=False).astype(int)
denseDf["Rank_Average"] = denseDf["Numbers"].rank(method="average").astype(int)
denseDf["Rank_Min"] = denseDf["Numbers"].rank(method="min").astype(int)
denseDf["Rank_Max"] = denseDf["Numbers"].rank(method="max").astype(int)
Rank_First
returns the numbers ranked by the order in which they appear (notice the index of 0 through 4), starting with 1 and ending with 5.Rank_DenseAsc
almost returns an identical result toRank_First
, except that we see the duplicate value of 10 receives one rank of 2. We also see that this returns in ascending order — notice how we can specify the order with rank.Rank_DenseDesc
returns the inverse ofRank_DenseAsc
.Rank_Average
returns an ascending range of 1 through 5, but skips 3 because of the duplicate values of 10 which are both ranked at 2. The value of 15 is ranked at 4. We’ll notice thatRank_Average
andRank_Min
return the same result.Rank_Min
returns the same result asRank_Average
in this case.Rank_Min
returns the lowest value of the rank if there are duplicate/multiple values in that rank.Rank_Max
returns a similar result toRank_Min
, except that it returns the highest value of the rank if there are duplicate/multiple values in that rank.
Now that we see how the rank function returns values, we can see how the rank function in our tableFullDf
will help us partition the data (in the case of PartitionBy_Alid
result) and rank the date (in the case of DenseRankBy_dtOne
result). We also see in the resulting dataframe that we can call the to_datetime function on a column in the dataframe and specify the format. Unlike pyspark or Spark SQL, we do not have to specify the current date and time format of the column. Finally, we convert the vField
column to an integer specifying that if a value fails we coerce that result — this returns NaN for the string value that cannot be converted.
Note: all images in the post are either created through actual code runs or from Pixabay. The written content of this post is copyright; all rights reserved. None of the written content in this post may be used in any artificial intelligence.