Common Transforms Solved In Pyspark, SQL and Pandas

SQLInSix Minutes
8 min readAug 14, 2021
Common Transforms Solved In Pyspark, SQL and Pandas

In the below videos, we look at the ROW_NUMBER(), PARTITION BY and DENSE_RANK() functionality:

Some questions that are answered in these videos:

  • In the examples, we’re using new tables that have data in them to demonstrate the ROW_NUMBER() functionality along with using some tables that we’ve used in other examples.
  • When we review the examples, what happens if we have multiple columns that we order by? How would we specify that if we wanted Column1 to come before Column2?
  • What can we pass into the ROW_NUMBER() functionality that allows us to invert the order of a data set? How could this be useful?
  • Think of a math question that ROW_NUMBER() would help us solve. Any question that involves first ordering data (ie: median, mode as simple examples) will highlight where this functionality will be useful.
  • In the examples, we’ll notice that we’re using a table with mixed values — some completely new in one column and another column with duplicates. These demos are live, so we are using live data.
  • What is the difference between partitioning by the column ID and the column Letter?
  • What happens when we insert a new record that is duplicate in both ID, Letter, and Val? How does the PARTITION BY with ROW_NUMBER() function here?
  • While people tend to think of PARTITION BY as helpful to remove duplicates, how would this functionality possibly help us with scaling horizontally and why?
  • You’ll notice that the table being used has merchants by names and product ids associated by the merchant. As a note, we would normally use an associative table here, but for the sake of this video, we are using this contrived example, as it helps to illustrate how we can use DENSE_RANK().
  • In the video, both ROW_NUMBER() and DENSE_RANK() are compared side-by-side. Using merchant AAA as an example, how do these two return different values?
  • Suppose we had 100 home sales by three states of Oklahoma, Kentucky and North Carolina — what would be the maximum DENSE_RANK() value if we ranked by state? What would be the maximum ROW_NUMBER()?
  • In considering “groups” and “ordering” what would we use for “ordering” and what would we use for “grouping” when comparing ROW_NUMBER() or DENSE_RANK()?

Test Data For Our Pyspark, SQL and Pandas Code

For our next few examples, we’ll use a different data set as we’ll see the above transforms along with some data level transformations. The below code blocks relate to either Pyspark or Pandas.

Pyspark Test Data

tableFullData = sc.parallelize([
{"Alid": "A", "dtOne": "2023-01-01", "dtTwo": "1-1-2023", "dtThree": "1/1/2023", "iField": 10, "vField": "7"},
{"Alid": "B", "dtOne": "2023-02-01", "dtTwo": "2-1-2023", "dtThree": "2/1/2023", "iField": 15, "vField": "9"},
{"Alid": "B", "dtOne": "2023-02-01", "dtTwo": "2-1-2023", "dtThree": "2/1/2023", "iField": 15, "vField": "9"},
{"Alid": "C", "dtOne": "2023-03-01", "dtTwo": "3-1-2023", "dtThree": "3/1/2023", "iField": 12, "vField": "4"},
{"Alid": "D", "dtOne": "2023-01-07", "dtTwo": "2-1-2023", "dtThree": "2/5/2023", "iField": 101, "vField": "10"},
{"Alid": "E", "dtOne": "2023-04-01", "dtTwo": "4-1-2023", "dtThree": "4/1/2023", "iField": 22, "vField": "A"}
])
tableFullDataDf = spark.createDataFrame(tableFullDataDf)
tableFullDataDf.createOrReplaceTempView("tableFullDataDf") ### For Spark SQL

Pandas Test Data

tableFull = {
"Alid": ["A","B","B","C","D","E"],
"dtOne": ["2023-01-01","2023-02-01","2023-02-01","2023-03-01","2023-01-07","2023-04-01"],
"dtTwo": ["1-1-2023","2-1-2023","2-1-2023","3-1-2023","2-1-2023","4-1-2023"],
"dtThree": ["1/1/2023","2/1/2023","2/1/2023","3/1/2023","2/5/2023","4/1/2023"],
"iField": [10, 15, 15, 12, 101, 22],
"vField": ["7","9","9","4","10","A"],
}

tableFullDf = pd.DataFrame(tableFull)

Remember that you can create a pandas dataframe or pyspark dataframe from the inverse (ie: pandas dataframe to pyspark dataframe).

You can take these dataframes and edit them or even make your own as needed to create contrived data sets for unit testing. This really helps if you develop using test driven development. One of the advantages of test driven development that sometimes gets overlooked is the speed at which you verify that your functionality works as expected. When you deal with development environments that have 1TB of data, using a small data set to test functionality can be a huge time saver, as even the development environment has a huge size (and in most cases, a development environment should be a true development environment in matching how a production environment is).

Pyspark

Let’s take our pyspark dataframe and use the functionality of ROW_NUMBER(), PARTITION BY and DENSE_RANK() along with editing some of our data values. We’ll look at each of these transformed results one-by-one:

from pyspark.sql import functions
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, date_format, expr

ordering_id = Window.orderBy("Alid")
partitioning = Window.partitionBy("Alid").orderBy("Alid")
ordering_date = Window.orderBy("dtOne")
updatedDf = tableFullDataDf \
.withColumn("RowNumberBy_Alid", functions.row_number().over(ordering_id)) \
.withColumn("PartitionBy_Alid", functions.row_number().over(partitioning)) \
.withColumn("DenseRankBy_dtOne", functions.dense_rank().over(ordering_date)) \
.select(
"Alid"
,"RowNumberBy_Alid"
,"PartitionBy_Alid"
,"dtOne"
,"DenseRankBy_dtOne"
,"dtTwo"
, date_format(to_date("dtTwo","M-d-yyyy"), "yyyy-MM-dd").alias("dtTwo_formatted")
,"dtThree"
, date_format(to_date("dtThree","M/d/yyyy"), "yyyy-MM-dd").alias("dtThree_formatted")
,"iField"
,"vField"
, expr("try_cast(vField as int)").alias("vFieldAsInteger")
)
display(updatedDf)

Each of the transformed column names state what they are — for instance, RowNumberBy_Alid is a column with the rows ordered.

In order for us to use the same transforms that we saw in the SQL basics’ videos, we must first create the appropriate Window functioning calls. With the ordering_id window function, we want to see the column Alid ordered using the row_number() functionality. As we expect, each row now has a row number added to it. However, we see that we have duplicate Alid values, so if we want to find the duplicates where they have a value of 2, we can partition by this same column with the same order. The partitioning window call performs this and we see our duplicate value of B is labelled as 1 and 2 because there are 2 of them. Finally, we rank our dates using the ordering_date window function, but this time passing in dense_rank().

When formatting dates, we first have to convert the string to a date. We see this with the to_date function and notice that this function uses the string format of the dates. This must match, otherwise you’ll get nothing. From there, we then pick the format of our date to yyyy-MM-dd. Finally, we’ll notice that we use a SQL expression with expr() to try to cast varchars as integers and the varchar (“A”) that is not an integer returns null.

Spark SQL

Let’s take our Spark SQL view and use the functionality of ROW_NUMBER(), PARTITION BY and DENSE_RANK() along with editing some of our data values. We’ll look at each of these transformed results one-by-one:

SELECT 
Alid
, ROW_NUMBER() OVER (ORDER BY Alid) AS RowNumberBy_Alid
, ROW_NUMBER() OVER (PARTITION BY Alid ORDER BY Alid) AS PartitionBy_Alid
, dtOne
, DENSE_RANK() OVER (ORDER BY dtOne) AS DenseRankBy_dtOne
, dtTwo
, DATE_FORMAT(TO_DATE(dtTwo,"M-d-yyyy"), "yyyy-MM-dd") AS dtTwo_formatted
, dtThree
, DATE_FORMAT(TO_DATE(dtThree,"M/d/yyyy"), "yyyy-MM-dd") AS dtThree_formatted
, iField
, vField
, TRY_CAST(vField AS INT) AS vFieldAsInteger
FROM tableFullDataDf

We use the same naming of transformed columns that we used in the pyspark.

As we see in the above Spark SQL, our SQL matches what we see in the videos as far as how we’re ordering, partitioning and ranking data. We also see how the Spark SQL results in the same results as the pyspark as far as the values (the actual order will differ).

As for the specific data values, notice the similarity between pyspark and Spark SQL.

Pandas

Let’s take our pandas dataframe and use the functionality of ROW_NUMBER(), PARTITION BY and DENSE_RANK() along with editing some of our data values. We’ll look at each of these transformed results one-by-one:

tableFullDf["RowNumberBy_Alid"] = tableFullDf.index + 1
tableFullDf["PartitionBy_Alid"] = tableFullDf.groupby("Alid")["Alid"].rank(method="first").astype(int)
tableFullDf["DenseRankBy_dtOne"] = tableFullDf["dtOne"].rank(method="dense", ascending=True).astype(int)
tableFullDf["dtTwo_formatted"] = pd.to_datetime(tableFullDf["dtTwo"]).dt.strftime('%Y-%m-%d')
tableFullDf["dtThree_formatted"] = pd.to_datetime(tableFullDf["dtThree"]).dt.strftime('%Y-%m-%d')
tableFullDf["vFieldAsInteger"] = pd.to_numeric(tableFullDf["vField"], errors="coerce")
print(tableFullDf[["Alid","RowNumberBy_Alid","PartitionBy_Alid","dtOne","DenseRankBy_dtOne","dtTwo","dtTwo_formatted","dtThree","dtThree_formatted","vField","vFieldAsInteger","iField"]])

We use the same naming of transformed columns that we used in the pyspark.

Pandas differs significantly from pyspark (and Spark SQL). Let’s look at the functionality with more depth, as if you’re coming from any SQL language this won’t look familiar.

For the column of RowNumberBy_Alid, we’re simply using the built-in index that pandas already has one dataframes and adding 1 to each value. When you print a pandas’ dataframe, you’ll notice that there is an index starting at 0. By adding 1 to this index, we get the same result that we would get with row_number. Keep in mind that this result is correct only because of how we’ve ordered the data — for instance, if we wanted a different order, we would need to specify that.

Let’s dig into how the rank works by looking at a more granular example with a set of numbers. We’ll first create a pandas’ dataframe with a set of numbers ranging from 5 to 20 and some duplicate numerical values.

dense = {
"Numbers": [5,10,10,15,20]
}

denseDf = pd.DataFrame(dense)

Next, let’s rank this set of numbers using methods, such as first, dense, average, min and max. An explanation of the results we see is provided below this.


denseDf["Rank_First"] = denseDf["Numbers"].rank(method="first").astype(int)
denseDf["Rank_DenseAsc"] = denseDf["Numbers"].rank(method="dense").astype(int)
denseDf["Rank_DenseDesc"] = denseDf["Numbers"].rank(method="dense", ascending=False).astype(int)
denseDf["Rank_Average"] = denseDf["Numbers"].rank(method="average").astype(int)
denseDf["Rank_Min"] = denseDf["Numbers"].rank(method="min").astype(int)
denseDf["Rank_Max"] = denseDf["Numbers"].rank(method="max").astype(int)
  • Rank_First returns the numbers ranked by the order in which they appear (notice the index of 0 through 4), starting with 1 and ending with 5.
  • Rank_DenseAsc almost returns an identical result to Rank_First, except that we see the duplicate value of 10 receives one rank of 2. We also see that this returns in ascending order — notice how we can specify the order with rank.
  • Rank_DenseDesc returns the inverse of Rank_DenseAsc.
  • Rank_Average returns an ascending range of 1 through 5, but skips 3 because of the duplicate values of 10 which are both ranked at 2. The value of 15 is ranked at 4. We’ll notice that Rank_Average and Rank_Min return the same result.
  • Rank_Min returns the same result as Rank_Average in this case. Rank_Min returns the lowest value of the rank if there are duplicate/multiple values in that rank.
  • Rank_Max returns a similar result to Rank_Min, except that it returns the highest value of the rank if there are duplicate/multiple values in that rank.

Now that we see how the rank function returns values, we can see how the rank function in our tableFullDf will help us partition the data (in the case of PartitionBy_Alid result) and rank the date (in the case of DenseRankBy_dtOne result). We also see in the resulting dataframe that we can call the to_datetime function on a column in the dataframe and specify the format. Unlike pyspark or Spark SQL, we do not have to specify the current date and time format of the column. Finally, we convert the vField column to an integer specifying that if a value fails we coerce that result — this returns NaN for the string value that cannot be converted.

Note: all images in the post are either created through actual code runs or from Pixabay. The written content of this post is copyright; all rights reserved. None of the written content in this post may be used in any artificial intelligence.

--

--