Skip to content
Služby na míru Spolupráce O mně Reference Hotová řešení Kontakt
template_used: item

Když se řekne ETL (Extract, Transform, Load), většina lidí si představí specializované nástroje jako Keboola, Talend, Informatica, Pentaho, Microsoft SQL Server Integration Services nebo dbt. Občas existují ale okamžiky, kdy máte data v SQL databázi a z různých důvodů ETL nástroje nasadit nemůžete. Jak to řeším já u některých klientů?

Proč se obejít bez ETL nástroje?

Použít specializovaný nástroj k úpravě dat je příjemné z řady různých důvodu. Zjednodušuje život, zajišťuje správný běh ve správném pořadí, řeší chyby, notifikace, … Ne vždy je ale život tak krásný a použití SQL a uložených procedur pro ETL může být efektivní volbou. Třeba pokud:

  • Máte omezený rozpočet
  • Umíte SQL, ale s žádným ETL jste nepracovali (pak je ale mé doporučení jasné: naučte se to)
  • Prostředí je tak specifické, že vám neumožňuje nasadit specializované nástroje (DB server, k němu nic jiného a klient odmítá cokoliv dalšího spouštět/platit).

Pojďme se tedy podívat, jak to řeším já. V tomto případě v prostředí PostgreSQL.

Architektura řešení

Naše řešení sestává ze tří hlavních komponent:

  • tabulka ETL příkazů: Obsahuje příkazy k provedení jednotlivých ETL kroků
  • tabulka logů: Ukládá informace o provedení jednotlivých příkazů, včetně doby jejich trvání
  • uložená procedura: Hlavní logika ETL procesu, která provádí jednotlivé příkazy.

Krok 1: Vytvoření ETL tabulky

Nejprve potřebujeme tabulku, která bude obsahovat skripty, které chceme spouštět. Vzhledem k tomu, že je často potřebujeme spouštět v daném pořadí, tak kromě samotného skriptu obsahuje i další údaje v jednotlivých sloupcích:

  • id = id skriptu
  • name = název tabulky, která je skriptem vytvářena (název je pak využíván pro mazání dané tabulky v rámci procedury)
  • command = samostatný SQL skript
  • command_type = označení, do které vrstvy příkaz patří (ty jsou pak zpracovávány v pořadí L0 .. L3)
CREATE TABLE bisuperhero.bisuperhero_etl (
    id serial4 NOT NULL,
    name varchar(50) NULL,
    command text NULL,
    command_type varchar(50) NULL,
    CONSTRAINT bisuperhero_etl_pk PRIMARY KEY (id)
);

Krok 2: Vytvoření tabulky pro logy

Pro sledování výkonu a zaznamenávání událostí používám tabulku s logy. Do té si ukládám, kdy skript proběhl, jak dlouho trval a kdo jej spustil. Je mi následně podkladem pro optimalizaci či nějaký ten bug hunting.

CREATE TABLE bisuperhero.etl_logs (
    id serial4 NOT NULL,
    event_time timestamp DEFAULT CURRENT_TIMESTAMP NULL,
    event_type text NULL,
    table_name text NULL,
    user_name text NULL,
    runtime_ms int4 NULL,
    CONSTRAINT etl_logs_pkey PRIMARY KEY (id)
);

Krok 3: Uložená procedura pro spuštění ETL procesu

Nyní přichází klíčová část – uložená procedura, která provede všechny ETL kroky. Tato procedura prochází příkazy v naší ETL tabulce a vykonává je v určeném pořadí. Celý kód najdete na konci článku, teď se podíváme na její jednotlivé části.

Rozbor kódu

Inicializace a příprava

Procedura začíná inicializací několika proměnných, které budou použity pro sledování času a logování. Proměnná current_user_name je nastavena na aktuálního uživatele, což nám umožňuje logovat, kdo proceduru spustil, loop proměnné zajišťují výpis aktuálního stavu běhu, časové proměnné pak počítají dobu běhu.

DECLARE
    command_record record;
    loop_count integer := 0;
    loop_total integer := 0;
    start_time TIMESTAMP;
    end_time TIMESTAMP;
    runtime_ms INTEGER;
    current_user_name TEXT := current_user;

Mazání původních tabulek

Prvním krokem je mazání původních tabulek. To provádím sadou jednotlivých DROP příkazů, které si název tabulky berou z řídící tabulky. Informaci o smazání dávám do logu.

RAISE NOTICE '================= Deleting original tables =================';    
    FOR command_record IN
        SELECT "name"
        FROM bisuperhero.bisuperhero_etl
        WHERE command_type NOT LIKE '%Temp%'
    LOOP
        start_time := clock_timestamp();
        EXECUTE 'DROP TABLE IF EXISTS bisuperhero.' || command_record."name";
        end_time := clock_timestamp();
        runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
        EXECUTE format('INSERT INTO bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''DROP TABLE'', %L, %L, %s)',
                       command_record."name", current_user_name, runtime_ms);
    END LOOP;

Zpracování jednotlivých vrstev dat

Procedura poté zpracovává jednotlivé vrstvy dat (L0, L1, L2, L3) postupně podle typu příkazu. Pro každou vrstvu je vypočítán celkový počet příkazů a následně jsou příkazy vykonávány v pořadí dle jejich ID.

Nejprve si tedy vytáhnu všechny skripty dané vrstvy a pak je pomocí smyčky spouštím. Pro informaci zároveň vypisuju, kde v rámci tabulek se aktuálně pohybuju. Níže je ukázka zpracování vrstvy L0, ale stejný postup je použit i pro další vrstvy.

RAISE NOTICE '================= Running: L0 Raw data =================';
    loop_count := 0;
    SELECT count(1) INTO loop_total FROM bisuperhero.bisuperhero_etl WHERE command_type = 'L0 Raw data';
    FOR command_record IN
        SELECT command, "name"
        FROM bisuperhero.bisuperhero_etl
        WHERE command_type = 'L0 Raw data'
        ORDER BY id ASC
    LOOP
        loop_count := loop_count + 1;
        RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name";
        start_time := clock_timestamp();
        EXECUTE command_record.command;
        end_time := clock_timestamp();
        runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
        EXECUTE format('INSERT INTO bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
                       command_record."name", current_user_name, runtime_ms);
    END LOOP;

Finální krok: Aktualizace dat

Na závěr procedura aktualizuje tabulku etl_refresh, která zaznamenává čas poslední aktualizace dat. Tu používám jednak pro rychlou kontrolu, kdy procedura naposledy proběhla (aniž bych se musel dívat do logu nebo do cronu. Právě cron totiž podle zadaných pravidel proceduru v daný čas spouští.

start_time := clock_timestamp();
    EXECUTE 'DROP TABLE IF EXISTS bisuperhero.etl_refresh; CREATE TABLE bisuperhero.etl_refresh AS SELECT NOW() AS data_refreshed_at;';
    end_time := clock_timestamp();
    runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
    EXECUTE format('INSERT INTO bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''CREATE TABLE'', %L, %L, %s)',
                   'etl_refresh', current_user_name, runtime_ms);

Další rozšíření

Zobrazená procedura opravdu jen řeší spouštění zpracování a nic jiného. U jiných klientů ji mám složitější. Příkladem může být zpracování dat z účetnictví Pohoda. To má pro každý účetní rok (a každou firmu) samostatnou databázi.

V rámci ETL tabulky (a procedury), tak řešením:

  • vytažení dat ze všech definovaných databází a spojení jednotlivých tabulek (včetně doplnění identifikací o jakou firmu, období a databázi se jedná)
  • logiku, zda chci stahovat všechny databáze, nebo mi pro konkrétní tabulku stačí jen údaje z posledního roku
  • rozdílné chování skriptů podle toho, zda je daný účetní rok už uzavřen nebo ne (očištění o řádky, které se týkají jiných účetních období)
  • rozšíření dimenzí pro následnou analytiku (tedy spojení různých číselníků, doplnění defaultních hodnot pro nevyplněné sloupce, atp.)

Jiným příkladem rozšíření může být automatické zasílání informací na e-mail/slack v okamžiku, kdy se něco rozbije a skripty nedoběhnou. V závislosti na prostředí k tomu mohu využít e-mailingové možnosti SQL agenta, lokální python kód nebo vše kontrolovat právě podle data aktualizace celého ETL až ve finálním reportovacím nástroji.

Shrnutí a celý kód

Použití SQL a uložených procedur pro ETL proces má své výhody i nevýhody. Je to tak trochu škrábání se levou rukou za pravým uchem, ale když to nejde lépe… Hlavní výhodou je nízká cena a relativně vysoká flexibilita. Na druhou stranu může být údržba a rozšíření takového řešení náročnější než použití specializovaného ETL nástroje. Nicméně, pro menší projekty nebo pro ty, kteří chtějí mít plnou kontrolu nad svými datovými toky, je to skvělá alternativa. Rozhodně lepší než psát složité procedury, které se vám budou debugovat výrazně hůře než jednotlivé SQL skripty. Ty můžete totiž vyvíjet postupně.

CREATE OR REPLACE PROCEDURE bisuperhero.bisuperhero_etl()
 LANGUAGE plpgsql
AS $procedure$
DECLARE
    command_record record;
    loop_count integer := 0;
    loop_total integer := 0;
    start_time TIMESTAMP;
    end_time TIMESTAMP;
    runtime_ms INTEGER;
    current_user_name TEXT := current_user;
BEGIN

    RAISE NOTICE '================= Deleting original tables =================';    
    FOR command_record IN
        SELECT "name"
        FROM aliveplatform.bisuperhero.bisuperhero_etl
        WHERE command_type NOT LIKE '%Temp%'
    LOOP
        start_time := clock_timestamp();
        EXECUTE 'DROP TABLE IF EXISTS aliveplatform.bisuperhero.' || command_record."name";
        end_time := clock_timestamp();
        runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
        -- Log the event
        EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''DROP TABLE'', %L, %L, %s)',
                       command_record."name", current_user_name, runtime_ms);
    END LOOP;

    RAISE NOTICE '================= Running: L0 Raw data =================';
    loop_count := 0;
    SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L0 Raw data';
    FOR command_record IN
        SELECT command, "name"
        FROM aliveplatform.bisuperhero.bisuperhero_etl
        WHERE command_type = 'L0 Raw data'
        ORDER BY id ASC
    LOOP
        loop_count := loop_count + 1; -- Increment loop count
        RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
        start_time := clock_timestamp();
        EXECUTE command_record.command;
        end_time := clock_timestamp();
        runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
        -- Log the event
        EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
                       command_record."name", current_user_name, runtime_ms);
    END LOOP;

    RAISE NOTICE '================= Running: L1 Cleansed data =================';
    loop_count := 0;
    SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L1 Cleansed data'; 
    FOR command_record IN
        SELECT command, "name"
        FROM aliveplatform.bisuperhero.bisuperhero_etl
        WHERE command_type = 'L1 Cleansed data'
        ORDER BY id ASC
    LOOP
        loop_count := loop_count + 1; -- Increment loop count
        RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
        start_time := clock_timestamp();
        EXECUTE command_record.command;
        end_time := clock_timestamp();
        runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
        -- Log the event
        EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
                       command_record."name", current_user_name, runtime_ms);
    END LOOP;

    RAISE NOTICE '================= Running: L2 Integrated data =================';
    loop_count := 0;
    SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L2 Integrated data';
    FOR command_record IN
        SELECT command, "name"
        FROM aliveplatform.bisuperhero.bisuperhero_etl
        WHERE command_type = 'L2 Integrated data'
        ORDER BY id ASC
    LOOP
        loop_count := loop_count + 1; -- Increment loop count
        RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
        start_time := clock_timestamp();
        EXECUTE command_record.command;
        end_time := clock_timestamp();
        runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
        -- Log the event
        EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
                       command_record."name", current_user_name, runtime_ms);
    END LOOP;

    RAISE NOTICE '================= Running: L3 Consumable data =================';
    loop_count := 0;
    SELECT count(1) INTO loop_total FROM aliveplatform.bisuperhero.bisuperhero_etl WHERE command_type = 'L3 Consumable data';
    FOR command_record IN
        SELECT command, "name"
        FROM aliveplatform.bisuperhero.bisuperhero_etl 
        WHERE command_type = 'L3 Consumable data'
        ORDER BY id ASC
    LOOP
        loop_count := loop_count + 1; -- Increment loop count
        RAISE NOTICE 'Currently executing row % / %: %', loop_count, loop_total, command_record."name"; -- Print current row
        start_time := clock_timestamp();
        EXECUTE command_record.command;
        end_time := clock_timestamp();
        runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
        -- Log the event
        EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''EXECUTE COMMAND'', %L, %L, %s)',
                       command_record."name", current_user_name, runtime_ms);
    END LOOP;

    start_time := clock_timestamp();
    EXECUTE 'DROP TABLE IF EXISTS aliveplatform.bisuperhero.etl_refresh; CREATE TABLE aliveplatform.bisuperhero.etl_refresh AS SELECT NOW() AS data_refreshed_at;';
    end_time := clock_timestamp();
    runtime_ms := EXTRACT(EPOCH FROM end_time - start_time) * 1000;
    -- Log the event
    EXECUTE format('INSERT INTO aliveplatform.bisuperhero.etl_logs (event_type, table_name, user_name, runtime_ms) VALUES (''CREATE TABLE'', %L, %L, %s)',
                   'etl_refresh', current_user_name, runtime_ms);

END;
$procedure$
;

Robert Junek

Robert Junek

BI konzultant a datový analytik

Pomáhám firmám proměnit data ve strategickou výhodu a jasný příběh pro vedení.

Spojme se