When you hear ETL (Extract, Transform, Load), most people imagine specialized tools like Keboola, Talend, Informatica, Pentaho, Microsoft SQL Server Integration Services, or dbt. But sometimes there are moments when you have data in a SQL database and for various reasons you cannot deploy ETL tools. How do I solve this for some clients?
Why go without an ETL tool?
Using a specialized tool to modify data is pleasant for a number of reasons. It simplifies life, ensures correct execution in the right order, handles errors, notifications, ... But life isn't always so beautiful, and using SQL and stored procedures for ETL can be an effective choice. For example if:
- You have a limited budget
- You know SQL, but haven't worked with any ETL (but then my recommendation is clear: learn it)
- The environment is so specific that it doesn't allow you to deploy specialized tools (DB server, nothing else with it, and the client refuses to run/pay for anything else).
So let's see how I solve it. In this case, in a PostgreSQL environment.
Solution Architecture
Our solution consists of three main components:
- ETL commands table: Contains commands to execute individual ETL steps
- Log table: Stores information about the execution of individual commands, including their duration
- Stored procedure: The main logic of the ETL process that executes individual commands.
Step 1: Creating the ETL Table
First, we need a table that will contain the scripts we want to run. Since we often need to run them in a specific order, in addition to the script itself, it contains other data in individual columns:
id= script idname= name of the table created by the script (the name is then used to delete the table within the procedure)command= standalone SQL scriptcommand_type= designation of which layer the command belongs to (these are then processed in order L0 .. L3)
CREATE TABLE bisuperhero.bisuperhero_etl (
id serial4 NOT NULL,
name varchar(50) NULL,
command text NULL,
command_type varchar(50) NULL,
CONSTRAINT bisuperhero_etl_pk PRIMARY KEY (id)
);
Step 2: Creating the Log Table
For performance monitoring and event logging, I use a log table. I store when the script ran, how long it took, and who ran it. It subsequently serves as a basis for optimization or some bug hunting.
CREATE TABLE bisuperhero.etl_logs (
id serial4 NOT NULL,
event_time timestamp DEFAULT CURRENT_TIMESTAMP NULL,
event_type text NULL,
table_name text NULL,
user_name text NULL,
runtime_ms int4 NULL,
CONSTRAINT etl_logs_pkey PRIMARY KEY (id)
);
Step 3: Stored Procedure for Running the ETL Process
Now comes the key part – the stored procedure that executes all ETL steps. This procedure iterates through the commands in our ETL table and executes them in the specified order. You can find the full code at the end of the article, now let's look at its individual parts.
Code Analysis
Initialization and Preparation
The procedure begins by initializing several variables that will be used for time tracking and logging. The current_user_name variable is set to the current user, allowing us to log who started the procedure, loop variables ensure the output of the current run status, and time variables calculate the runtime.
DECLARE
command_record record;
loop_count integer := 0;
loop_total integer := 0;
start_time TIMESTAMP;
end_time TIMESTAMP;
runtime_ms INTEGER;
current_user_name TEXT := current_user;
Deleting Original Tables
The first step is deleting the original tables. I do this with a set of individual DROP commands, which take the table name from the control table. I put information about the deletion into the log.
RAISE NOTICE '================= Deleting original tables =================';
FOR command_record IN
SELECT "name"
FROM bisuperhero.bisuperhero_etl
WHERE command_type NOT LIKE '%Temp%'
LOOP
start_time := clock_timestamp();
EXECUTE 'DROP TABLE IF EXISTS bisuperhero.' || command_record."name";
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
EXECUTE format('INSERT INTO bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''DROP TABLE'', %L, %L, %s)',
command_record."name", current_user_name, runtime_ms);
END LOOP;
Processing Individual Data Layers
The procedure then processes individual data layers (L0, L1, L2, L3) sequentially according to the command type. For each layer, the total number of commands is calculated, and then the commands are executed in order of their ID.
So first I pull all scripts of the given layer and then run them using a loop. For information, I also print where I am currently within the tables. Below is an example of processing the L0 layer, but the same procedure is used for other layers.
RAISE NOTICE '================= Running: L0 Raw data =================';
loop_count := 0;
SELECT count(1) INTO loop_total FROM bisuperhero.bisuperhero_etl WHERE command_type = 'L0 Raw data';
FOR command_record IN
SELECT command, "name"
FROM bisuperhero.bisuperhero_etl
WHERE command_type = 'L0 Raw data'
ORDER BY id ASC
LOOP
loop_count := loop_count + 1;
RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name";
start_time := clock_timestamp();
EXECUTE command_record.command;
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
EXECUTE format('INSERT INTO bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
command_record."name", current_user_name, runtime_ms);
END LOOP;
Final Step: Data Update
Finally, the procedure updates the etl_refresh table, which records the time of the last data update. I use this for a quick check of when the procedure last ran (without having to look into the log or cron). It is cron that triggers the procedure at a given time according to the specified rules.
start_time := clock_timestamp();
EXECUTE 'DROP TABLE IF EXISTS bisuperhero.etl_refresh; CREATE TABLE bisuperhero.etl_refresh AS SELECT NOW() AS data_refreshed_at;';
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
EXECUTE format('INSERT INTO bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''CREATE TABLE'', %L, %L, %s)',
'etl_refresh', current_user_name, runtime_ms);
Further Extensions
The shown procedure really only handles execution processing and nothing else. For other clients, I have it more complex. An example could be processing data from Pohoda accounting. It has a separate database for each fiscal year (and each company).
Within the ETL table (and procedure), I solve:
- extracting data from all defined databases and joining individual tables (including adding identification of which company, period, and database it is)
- logic whether I want to download all databases, or if data from the last year is enough for a specific table
- different script behavior depending on whether the given fiscal year is already closed or not (cleaning up rows that relate to other accounting periods)
- extending dimensions for subsequent analytics (i.e., joining various code lists, filling in default values for unfilled columns, etc.)
Another example of extension can be automatic sending of information to email/Slack the moment something breaks and scripts don't finish. Depending on the environment, I can use the emailing capabilities of the SQL agent, local Python code, or check everything according to the update date of the entire ETL in the final reporting tool.
Summary and Full Code
Using SQL and stored procedures for the ETL process has its advantages and disadvantages. It's a bit like scratching your right ear with your left hand, but when there's no better way... The main advantage is low cost and relatively high flexibility. On the other hand, maintenance and extension of such a solution can be more demanding than using a specialized ETL tool. However, for smaller projects or for those who want to have full control over their data flows, it is a great alternative. Definitely better than writing complex procedures that will be significantly harder to debug than individual SQL scripts. You can develop those gradually.
CREATE OR REPLACE PROCEDURE bisuperhero.bisuperhero_etl()
LANGUAGE plpgsql
AS $procedure$
DECLARE
command_record record;
loop_count integer := 0;
loop_total integer := 0;
start_time TIMESTAMP;
end_time TIMESTAMP;
runtime_ms INTEGER;
current_user_name TEXT := current_user;
BEGIN
RAISE NOTICE '================= Deleting original tables =================';
FOR command_record IN
SELECT "name"
FROM aliveplatform.bisuperhero.bisuperhero_etl
WHERE command_type NOT LIKE '%Temp%'
LOOP
start_time := clock_timestamp();
EXECUTE 'DROP TABLE IF EXISTS aliveplatform.bisuperhero.' || command_record."name";
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
-- Log the event
EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''DROP TABLE'', %L, %L, %s)',
command_record."name", current_user_name, runtime_ms);
END LOOP;
RAISE NOTICE '================= Running: L0 Raw data =================';
loop_count := 0;
SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L0 Raw data';
FOR command_record IN
SELECT command, "name"
FROM aliveplatform.bisuperhero.bisuperhero_etl
WHERE command_type = 'L0 Raw data'
ORDER BY id ASC
LOOP
loop_count := loop_count + 1; -- Increment loop count
RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
start_time := clock_timestamp();
EXECUTE command_record.command;
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
-- Log the event
EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
command_record."name", current_user_name, runtime_ms);
END LOOP;
RAISE NOTICE '================= Running: L1 Cleansed data =================';
loop_count := 0;
SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L1 Cleansed data';
FOR command_record IN
SELECT command, "name"
FROM aliveplatform.bisuperhero.bisuperhero_etl
WHERE command_type = 'L1 Cleansed data'
ORDER BY id ASC
LOOP
loop_count := loop_count + 1; -- Increment loop count
RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
start_time := clock_timestamp();
EXECUTE command_record.command;
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
-- Log the event
EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
command_record."name", current_user_name, runtime_ms);
END LOOP;
RAISE NOTICE '================= Running: L2 Integrated data =================';
loop_count := 0;
SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L2 Integrated data';
FOR command_record IN
SELECT command, "name"
FROM aliveplatform.bisuperhero.bisuperhero_etl
WHERE command_type = 'L2 Integrated data'
ORDER BY id ASC
LOOP
loop_count := loop_count + 1; -- Increment loop count
RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
start_time := clock_timestamp();
EXECUTE command_record.command;
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
-- Log the event
EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
command_record."name", current_user_name, runtime_ms);
END LOOP;
RAISE NOTICE '================= Running: L3 Consumable data =================';
loop_count := 0;
SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L3 Consumable data';
FOR command_record IN
SELECT command, "name"
FROM aliveplatform.bisuperhero.bisuperhero_etl
WHERE command_type = 'L3 Consumable data'
ORDER BY id ASC
LOOP
loop_count := loop_count + 1; -- Increment loop count
RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
start_time := clock_timestamp();
EXECUTE command_record.command;
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
-- Log the event
EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
command_record."name", current_user_name, runtime_ms);
END LOOP;
start_time := clock_timestamp();
EXECUTE 'DROP TABLE IF EXISTS aliveplatform.bisuperhero.etl_refresh; CREATE TABLE aliveplatform.bisuperhero.etl_refresh AS SELECT NOW() AS data_refreshed_at;';
end_time := clock_timestamp();
runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
-- Log the event
EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''CREATE TABLE'', %L, %L, %s)',
'etl_refresh', current_user_name, runtime_ms);
END;
$procedure$
;