Советы для тех, кто пишет скрипты для ETL

автор рубрика ,
Советы для тех, кто пишет скрипты для ETL

Ваши скрипты на Python при написании конвейеров данных (data pipeline) должны иметь свойство идемпотентности (idempotent). Это значит, что сколько бы вы раз не запускали скрипт с одними и теми же входными данными, он должен выдавать тот же результат. Данное свойство очень важно в ETL-инструментах или оркестраторах, например, Apache Airflow. Было бы странно, если бы вы перезапустили DAG (направленный ациклический граф), а он постоянно бы выдавал разные результаты. В этой статье пойдет речь о частых ошибках, которые допускают Python-программисты и инженеры данных при написании кода в распределенных системах.

Загрузка данных в базу данных

Одна из самых распространенных операций является чтение из файла в базу данных. Данные при чтении из файла буфферизуется, т.е. временно хранятся в памяти компьютера. Поэтому для обеспечения идемпотентности не должно происходить никаких блокировок или изменения буффера.

Одна из распространенных ошибок при чтении — использовать жестко заданный путь к файлу. Это приведет к конфликту, если несколько процессов попытаются прочитать или записать в один и тот же файл одновременно. Ниже приведен сценарий чтения из Amazon s3 в файл с именем tmp.csv, затем копирования данных их него в базу данных, после всего этого файл удаляется.

import boto3
import os
import psycopg2

nfile = transaction.csv

s3 = boto3.client("s3")
s3.download_file("BUCKET_NAME", "OBJECT_NAME", nfile)

conn = psycopg2.connect( "host=localhost dbname=postgres user=postgres")
cur = conn.cursor()
with open(nfile, "r") as f:
    next(f)  # skip headers line
    cur.copy_from(f, transaction, sep=",")
conn.commit()
os.remove(nfile) 

В чем проблема данного скрипта на Python? Если два процесса попытаются записать в этот файл одновременно, это приведет к дублированию данных в БД. Этот код имеет ещё один существенный изъян: если один процесс удалит файл до того, как другой закончит запись, тогда данные не мигрируют в БД, а мы этого не заметим.

Что делать? — использовать временный файл, строку или потоков байтов. Строку и потоков байтов, которые работают с буфферами оперативной памяти, можно использовать, если данных не много и компьютер обладает достаточными объемами этой самой памяти. Для записи в строку воспользуйтесь встроенным в Python объектом io.StringIO; для хранения потока байтов — io.Bytes.

Временный файл — это более универсальное решение, поскольку файлы могут быть настолько большими, насколько позволяет файловая система. Правда, для очень больших файлов этот метод непрактичен и слишком медленный. У нас есть различные для вас курсы для работы с Big Data.

В Python есть бибилотека tempfile, которая позволяет создавать временные файлы и директории внутри директории /tmp. Создавать файл можно внутри контекстного менеджера, который после окончания “позовет” ОС подчистить всё. Итак, Python-код выше с использованием tempfile переписывается так:

s3 = boto3.client("s3")
s3.download_file("BUCKET_NAME", "OBJECT_NAME", "transaction.csv")

conn = psycopg2.connect("host=localhost dbname=postgres user=postgres")
cur = conn.cursor()

# или через поток байтов:
# with io.BytesIO() as f:
with tempfile.NamedTemporaryFile(mode="wb") as f:
   s3.download_file("BUCKET_NAME", "OBJECT_NAME", f.name)
   f.seek(0)
   cur.copy_from(f, transaction, sep=",")

 conn.commit()

Транзакции SQL

Рекомендуется использовать контекстные менеджеры Python для работы с базами данных. Например, в следующем коде для удаления дубликатов:

conn = psycopg2.connect("bname=pos user=pos")
cur = conn.cursor() 
cur.execute("""
    DELETE FROM transaction 
    WHERE created_at = '2019-02-12'
""")
conn.commit()
cur.execute("""
    INSERT INTO transaction
    WHERE created_at = '2019-02-12
    SELECT * FROM loading'
""")
conn.commit() 

— если скрипт упадет сразу после DELETE, то операция INSERT не произойдет. Поэтому транзакции выполнять через контекстный менеджер и выполнить commit в конце:

conn = psycopg2.connect("bname=pos user=pos")
with conn.cursor() as cur:
    cur.execute("DELETE FROM ...")
    cur.execute("INSERT INTO ...")
    conn.commit()

Функциональное программирование в Python

В функциональном программировании есть такое понятие как “чистая” (pure) функция. Это такая функция, которая выполняется без побочных эффектов или изменения входных параметров. Чистая функция идемпотентна, ведь она возвращает тот же результат в ответ на те же входные параметры. В реализации подразумевается обычное создание нового объекта с последующим его возвращением.

Под побочным эффектом подразумевается, что функция не должна делать что-то ещё, помимо того ради чего она вызвана. Поэтому, например, функция, находящая медиану, не должна что-то выводить какое-нибудь приглашение пользователя, общаться с ним, изменять глобальные переменные или что-то в этом роде. Конечно, полностью избавиться от них невозможно, но каким-то образом выделены они должны быть (да хотя бы комментарием).

В Python, к тому же, есть изменяемые (mutable) и неизменяемые (immutable) объекты. К первому типу относят список (list) и словарь (dict). Передавая их в качестве аргумента и изменяя их внутри функции, вы изменяете их самих. В терминах языка Си это бы обозначало, что эти объекты передаются по ссылке, а не по значению. Поэтому не пишите такой код, если не хотите менять изначальный объект:

def min_max_scale(s: list):
min_val = min(s)
max_val = max(s)
for i in range(len(s)):
    s[i] = (s[i] - min_val) / (max_val - min_val)
return s

Вместо этого создайте внутри функции новый:

def min_max_scale(s: list):
min_val = min(s)
max_val = max(s)
new_series = []
for i in range(len(s)):
   new_series = (s[i] - min_val) / (max_val - min_val)
return s

 

Надеемся, что вы поймете важность свойства идемпотентности при создании конвейеров данных на Python в распредеденных системах. Ещё больше о написании конвейеров обработки больших данных на Python вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Источники
  1. https://www.babbling.fish/elt-cookbook-python/
Комментировать