Probably one of the best parts of being in Data Engineering and data for a decade is the ability to sit back and watch new things come along. New things that is. It’s fun to see what the community dreams up.
This one I didn’t see coming. UDTFs (User Defined Table Function).
I’ve been a hater of Python UDFs in the context of Spark for a long time. Most all the UDFs I’ve seen can always be replaced by simply using the out-of-the-box functions provided by Spark. UDFs have bad performance on large datasets, especially poorly written ones.
I’m not sure what to think about UDTFs, but whatever, I guess in my old age I just let people do what they want, they want to foot-gun themselves, then so be it, a good way to learn.
UDTFs - User-Defined Table Function
Python User-defined Table Functions (UDTFs) came about in Spark version 3.5, so it’s very new. Normal UDFs that have been around in Spark for some time are called “scalar functions” they return single values.
Think about a PySpark Dataframe, then apply a UDF to each row, so the UDF returns a single value and is applied on the entire Datafarme row-by-row.
This differs from UDTFs. UDTFs can return an entire “table” of data (or can if you want).
This is an interesting idea is it not?
I am curious what the performance is like, if UDTFs are as slow as UDFs, but whatever, I promised I would be nice and not complain about it.
I get it, the thing about UDFs or UDTFs is that they allow those organizations who are entrenched in Spark for their workloads, to be able to do things “outside the box” of what Spark usually is used for.
It’s flexibility.
Flexibility has its place and time.
Trying out UDTFs in Databricks.
If you know me, you know I’m not a fan of theory, I prefer to actually try something out. Kick ye’ old tires if you know what I mean. The best way to learn in Data Engineering is to DO code. Strike while the iron is hot.
So, in case you didn’t know, Databricks offers a free Community Edition, which I’m not sure why more people don’t take advantage of it. It’s the perfect way to test out things and learn new things.
Things to know about UDTFs.
I’m just going to rattle off some things you should know about UDTFs, but I assume you will learn more from looking at the one we build.
Import a UDTF from spark.sql.functions
UDTFs are written as Python classes
Can use Python decorator @udtf
UDTFs can be registered and used in the SQL context.
Trying out UDTFs in Databricks.
Let’s just try one out, I’ve never used one, so this should be interesting, but I think we will learn the most doing this.
The whole point of using a UDF or UDTF is that we are reaching for some functionality that is outside Spark. So, let’s come up with a pretend scenario.
Since LLMs and AI are popular today, let’s do something along those lines. We are going to pretend we have a dataset/dataframe of company documents, including their text, and we have a pipeline built in Spark to massage the data in preparation for LLM work.
Let’s see if we can write UDTF to do some tokenizer stuff. During the messing around with data in a LLM context, text needs to be tokenized … and HuggingFace has a PyPi Python package that does this.
This is clearly work you probably can’t do easily with Spark out of the box. So our UDTF will use the Python package tokenizers to do this work on a Datframe.
First, let’s write the UDTF.
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
from tokenizers.pre_tokenizers import Whitespace
@udtf(returnType="document_id: int, document_name: string, text: string")
class PreTokenizerUDTF:
def eval(self, row: Row):
pre_tokenizer = Whitespace()
yield (row["document_id"], row["document_name"], pre_tokenizer.pre_tokenize_str(row["text"]))
Let’s explain what’s happening.
This UDTF is made up of a Python class PreTokenizerUDTF and also has the @udtf decorator. It has a single method called eval (required). It also takes a Row as an argument since we will be passing an entire SQL context table as an argument.
Note that we have to have a returnType specified, in our case, it’s all the columns in our dataframe/SQL table plus the new one our UDTF is calculating.
We yield the result which is row columns, including pre_tokenizer.pre_tokenize_str(row["text"]) which is what we are using the UDTF for in the first place … a
pre_tokenizer_str method.
Let’s write a sample Dataframe to run this against.
dataframe = spark.createDataFrame(
[
(1, "Company Goals", "We want to sell 10% more ACME Widgets into international markets this year."),
(2, "Company Document", "Our Marketing strategy at ACME, INC is to be active on MySpace."),
],
[
"document_id",
"document_name",
"text"
]
)
Running the UDTF
So now we can actually register our UDTF in an SQL Context and run it.
dataframe.createOrReplaceTempView("company_text")
spark.udtf.register("my_first_udtf", PreTokenizerUDTF)
output = spark.sql("""
SELECT *
FROM my_first_udtf(TABLE(company_text))
"""
)
output.display()
Sorta makes sense right?
Above code we register our UDTF, register our Dataframe as SQL Table, and then we can call the the UDTF in the FROM clause of our SQL and pass in the table.
Below you can see the results from my Community Edition of Databricks.
Something wrong with the text output, but hey, you get the point.
Thoughts
My thoughts on UDTF are surprisingly nice for once in my grumpy life. They are pretty easy to use once you do your first one. I find the fact that you can pass and return entire tables to be very nice and convenient, especially in the SQL context.
I think this will become very useful in the future with AI and LLM use cases that will be coming down the highway soon.
One of the problems with LLMs and the Data Engineering work that surrounds it is the scale of data operations to prep large datasets for use downstream.
Databricks and Delta Lake are the perfect toolset for this type of work that needs to scale. UTDFs in the context of Spark and Delta Lake doing custom transformations … well that’s incredibly powerful.