Ваши скрипты на Python при написании конвейеров данных (data pipeline) должны иметь свойство идемпотентности (idempotent). Это значит, что сколько бы вы раз не запускали скрипт с одними и теми же входными данными, он должен выдавать тот же результат. Данное свойство очень важно в ETL-инструментах или оркестраторах, например, Apache Airflow. Было бы странно, если бы вы перезапустили DAG (направленный ациклический граф), а он постоянно бы выдавал разные результаты. В этой статье пойдет речь о частых ошибках, которые допускают Python-программисты и инженеры данных при написании кода в распределенных системах.
Загрузка данных в базу данных
Одна из самых распространенных операций является чтение из файла в базу данных. Данные при чтении из файла буфферизуется, т.е. временно хранятся в памяти компьютера. Поэтому для обеспечения идемпотентности не должно происходить никаких блокировок или изменения буффера.
Одна из распространенных ошибок при чтении — использовать жестко заданный путь к файлу. Это приведет к конфликту, если несколько процессов попытаются прочитать или записать в один и тот же файл одновременно. Ниже приведен сценарий чтения из Amazon s3 в файл с именем tmp.csv
, затем копирования данных их него в базу данных, после всего этого файл удаляется.
import boto3 import os import psycopg2 nfile = transaction.csv s3 = boto3.client("s3") s3.download_file("BUCKET_NAME", "OBJECT_NAME", nfile) conn = psycopg2.connect( "host=localhost dbname=postgres user=postgres") cur = conn.cursor() with open(nfile, "r") as f: next(f) # skip headers line cur.copy_from(f, transaction, sep=",") conn.commit() os.remove(nfile)
В чем проблема данного скрипта на Python? Если два процесса попытаются записать в этот файл одновременно, это приведет к дублированию данных в БД. Этот код имеет ещё один существенный изъян: если один процесс удалит файл до того, как другой закончит запись, тогда данные не мигрируют в БД, а мы этого не заметим.
Что делать? — использовать временный файл, строку или потоков байтов. Строку и потоков байтов, которые работают с буфферами оперативной памяти, можно использовать, если данных не много и компьютер обладает достаточными объемами этой самой памяти. Для записи в строку воспользуйтесь встроенным в Python объектом io.StringIO
; для хранения потока байтов — io.Bytes
.
Временный файл — это более универсальное решение, поскольку файлы могут быть настолько большими, насколько позволяет файловая система. Правда, для очень больших файлов этот метод непрактичен и слишком медленный. У нас есть различные для вас курсы для работы с Big Data.
В Python есть бибилотека tempfile, которая позволяет создавать временные файлы и директории внутри директории /tmp
. Создавать файл можно внутри контекстного менеджера, который после окончания “позовет” ОС подчистить всё. Итак, Python-код выше с использованием tempfile
переписывается так:
s3 = boto3.client("s3") s3.download_file("BUCKET_NAME", "OBJECT_NAME", "transaction.csv") conn = psycopg2.connect("host=localhost dbname=postgres user=postgres") cur = conn.cursor() # или через поток байтов: # with io.BytesIO() as f: with tempfile.NamedTemporaryFile(mode="wb") as f: s3.download_file("BUCKET_NAME", "OBJECT_NAME", f.name) f.seek(0) cur.copy_from(f, transaction, sep=",") conn.commit()
Транзакции SQL
Рекомендуется использовать контекстные менеджеры Python для работы с базами данных. Например, в следующем коде для удаления дубликатов:
conn = psycopg2.connect("bname=pos user=pos") cur = conn.cursor() cur.execute(""" DELETE FROM transaction WHERE created_at = '2019-02-12' """) conn.commit() cur.execute(""" INSERT INTO transaction WHERE created_at = '2019-02-12 SELECT * FROM loading' """) conn.commit()
— если скрипт упадет сразу после DELETE
, то операция INSERT
не произойдет. Поэтому транзакции выполнять через контекстный менеджер и выполнить commit
в конце:
conn = psycopg2.connect("bname=pos user=pos") with conn.cursor() as cur: cur.execute("DELETE FROM ...") cur.execute("INSERT INTO ...") conn.commit()
Функциональное программирование в Python
В функциональном программировании есть такое понятие как “чистая” (pure) функция. Это такая функция, которая выполняется без побочных эффектов или изменения входных параметров. Чистая функция идемпотентна, ведь она возвращает тот же результат в ответ на те же входные параметры. В реализации подразумевается обычное создание нового объекта с последующим его возвращением.
Под побочным эффектом подразумевается, что функция не должна делать что-то ещё, помимо того ради чего она вызвана. Поэтому, например, функция, находящая медиану, не должна что-то выводить какое-нибудь приглашение пользователя, общаться с ним, изменять глобальные переменные или что-то в этом роде. Конечно, полностью избавиться от них невозможно, но каким-то образом выделены они должны быть (да хотя бы комментарием).
В Python, к тому же, есть изменяемые (mutable) и неизменяемые (immutable) объекты. К первому типу относят список (list) и словарь (dict). Передавая их в качестве аргумента и изменяя их внутри функции, вы изменяете их самих. В терминах языка Си это бы обозначало, что эти объекты передаются по ссылке, а не по значению. Поэтому не пишите такой код, если не хотите менять изначальный объект:
def min_max_scale(s: list): min_val = min(s) max_val = max(s) for i in range(len(s)): s[i] = (s[i] - min_val) / (max_val - min_val) return s
Вместо этого создайте внутри функции новый:
def min_max_scale(s: list): min_val = min(s) max_val = max(s) new_series = [] for i in range(len(s)): new_series = (s[i] - min_val) / (max_val - min_val) return s
Надеемся, что вы поймете важность свойства идемпотентности при создании конвейеров данных на Python в распредеденных системах. Ещё больше о написании конвейеров обработки больших данных на Python вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
- DPREP: Подготовка данных для Data Mining на Python
- AIRF: Курс Apache AirFlow
- Курс Data pipeline на Apache AirFlow и Arenadata Hadoop