Building A Simple Data Pipeline Part 3: Putting the SQL in PanSQL

So you’ve got a network sync job set up. But simply copying the data isn’t enough; you want to run transformations on it. You could do this after the sync finishes, with an external tool such as DBT, but if there are things that would be simpler to do first — and especially if they’re transformations that would cut down on the amount of data to transfer over the network, such as WHEREs or aggregates — it might be advantageous to run them inside of your sync script.

PanSQL supports running a limited subset of SQL SELECT queries inside the script itself. This is not a full SQL implementation, and is not intended to ever be one; we have DBT for that. But if you want to run some simple transforms on your data, mostly ones that can be done in a single pass over the data, you can do so with inline SQL statements.

This will be a bit more abstract than the first two tutorials, because we don’t know what your source database looks like. But databases in general tend to have enough in common that we can set out some general ideas.

Let’s say you don’t want the complete contents of one of your database tables; you only want an aggregate report. “What are the total sales figures for each sales category?” for example. If you know anything about SQL, you probably already have a query running through your head that says something like SELECT Category, SUM(SalesTotal) FROM Orders GROUP BY Category, right. In your destination database, go ahead and create a new table that would hold the results of that query. Let’s say it’s called Report, and the source table it’s reporting on was called Orders.

Now make a copy of your analyze script. In the copy, change the connection string (and the database type, if necessary) to the destination database’s values. On the analyze line, change the part that says with optimize to now say with exclude(Orders). This means exactly what it sounds like: the analyzer will exclude the Orders table from its output. (If you want to specify a set of tables to analyze and ignore everything else, you can use with include() instead. If there’s more than one table, separate the names with commas.) And change the output file name to something new, like \Pansynchro\DestDictionary.pansync. Save this, build it, and run it. Because it’s not running an optimize pass, it should produce the destination dictionary very quickly.

Now open up the server sync script again. Add a second load line under the first that will load this data dictionary with a new variable name, and change the open line for the writer to use this new dictionary. Then insert a few blank lines between the open commands and the sync command. This is where we’ll write our transform. Start with something like the following, changing the names if necessary to match your data:

stream orders as myDataDict.Orders
stream report as outDataDict.Report

Here we’re declaring two variables representing the data streams coming in and out of the sync process. The names orders and report are the names of the variables in the script, and the definition after the as keyword tells the script compiler where they’re located in the data dictionaries. (Remember back in the first tutorial, it said that the script compiler will read and process the data dictionaries as part of building the script? This is where that becomes relevant.)

Declaring these as stream means that they’re data streams to be processed one row at a time. It’s also possible to declare them as table instead; this means that the entire stream is read into an in-memory table and can be JOINed or otherwise used in more flexible ways. But because it reads the entire stream into memory, a table variable should not be used for any large table with millions of rows.

Now let’s write the transformation itself:

select o.Category CategoryID, sum(o.SalesTotal)
from orders o
group by Category
into report

A PanSQL transformation is written as just an ordinary SQL query. It must end with an into clause directing the output to a data variable, either a stream that’s going to be written out, or to a table. The script compiler will verify that the schema matches what’s in the data dictionary; if it’s not compatible, it will raise an error. (Field names and types must be compatible. You can use standard SQL aliases in the SELECT clause to rename fields as necessary.)

Make sure that that transformation appears before the sync command, and everything should work. Save the script, then open your client script and modify it to use the new data dictionary. Then save, rebuild everything, and run it and make sure everything works.

Of course, not all data in an ETL job comes from a database. In the next tutorial, we’ll look at how to work with data coming from other sources, such as local files or REST APIs.

Notes:

  • A stream can only be processed one time, one row at a time. If you want to use any JOINs in your script, everything you join to the stream must be a table.
  • PanSQL supports inner joins only, for the moment. This is a temporary limitation that will change in the future.
  • PanSQL supports WHERE, GROUP BY, and HAVING clauses. Things that would require the whole stream to be held in memory, such as ORDER BY, SELECT DISTINCT, and window functions, are not supported. (GROUP BY is kind of a gray area here. Aggregates are built up in memory as running totals, rather than holding the entire data set in memory. To keep from ending up with excessive memory consumption, try to keep the total number of groups produced by your queries down.)
  • Sub-selects and CTEs are likewise not supported, though it would be possible to emulate these to some degree by building your sub-select and outputting it to a table variable, to be further processed in another transformation. Support for this is likely to improve with time.
  • Due to internal technical limitations, a transformation with a GROUP BY clause can currently only contain a maximum of seven aggregates in its SELECT clause. This is a temporary limitation that will change in the future.