Home Машинное обучение Оптимизация обработки данных ETL на Talent.com с помощью Amazon SageMaker | DeepTech

Оптимизация обработки данных ETL на Talent.com с помощью Amazon SageMaker | DeepTech

0
Оптимизация обработки данных ETL на Talent.com с помощью Amazon SageMaker
 | DeepTech

Этот пост написан в соавторстве с Анатолием Хоменко, инженером по машинному обучению, и Абденуром Беззу, техническим директором Talent.com.

Основанная в 2011 году, Talent.com объединяет оплачиваемые списки вакансий от своих клиентов и общедоступные списки вакансий и создала единую, удобную для поиска платформу. Охватывая более 30 миллионов списков вакансий в более чем 75 странах и охватывая различные языки, отрасли и каналы распространения, Talent.com удовлетворяет разнообразные потребности соискателей, эффективно связывая миллионы соискателей с возможностями трудоустройства.

Миссия Talent.com — способствовать глобальным связям рабочей силы. Для достижения этой цели Talent.com объединяет списки вакансий из различных источников в Интернете, предлагая соискателям доступ к обширному пулу из более чем 30 миллионов вакансий, адаптированных к их навыкам и опыту. В соответствии с этой миссией Talent.com в сотрудничестве с AWS разработал передовую систему рекомендаций по вакансиям, основанную на глубоком обучении и призванную помочь пользователям в продвижении по карьерной лестнице.

Чтобы обеспечить эффективную работу этого механизма рекомендаций по вакансиям, крайне важно внедрить крупномасштабный конвейер обработки данных, отвечающий за извлечение и уточнение функций из агрегированных списков вакансий Talent.com. Этот конвейер способен обрабатывать 5 миллионов ежедневных записей менее чем за 1 час и позволяет параллельно обрабатывать записи за несколько дней. Кроме того, это решение позволяет быстро внедрить его в производство. Основным источником данных для этого конвейера является формат JSON Lines, который хранится в Amazon Simple Storage Service (Amazon S3) и секционируется по дате. Каждый день это приводит к созданию десятков тысяч файлов строк JSON с дополнительными обновлениями, происходящими ежедневно.

Основная цель этого конвейера обработки данных — облегчить создание функций, необходимых для обучения и развертывания механизма рекомендации вакансий на Talent.com. Стоит отметить, что этот конвейер должен поддерживать дополнительные обновления и удовлетворять сложные требования к извлечению функций, необходимые для модулей обучения и развертывания, необходимых для системы рекомендаций по вакансиям. Наш конвейер принадлежит к общему семейству процессов ETL (извлечение, преобразование и загрузка), которое объединяет данные из нескольких источников в большой центральный репозиторий.

Дополнительную информацию о том, как Talent.com и AWS совместно разработали передовые методы обработки естественного языка и моделей глубокого обучения с использованием Amazon SageMaker для создания системы рекомендаций по вакансиям, см. в документе «От текста к работе мечты: создание специалиста по рекомендации вакансий на основе НЛП». на Talent.com с помощью Amazon SageMaker. Система включает в себя разработку функций, проектирование архитектуры модели глубокого обучения, оптимизацию гиперпараметров и оценку модели, при этом все модули запускаются с использованием Python.

В этом посте показано, как мы использовали SageMaker для создания крупномасштабного конвейера обработки данных для подготовки функций для механизма рекомендации вакансий на Talent.com. Полученное решение позволяет специалисту по данным анализировать извлечение признаков в блокноте SageMaker с использованием библиотек Python, таких как Scikit-Learn или PyTorch, а затем быстро развернуть тот же код в конвейере обработки данных, выполняя извлечение функций в большом масштабе. Решение не требует переноса кода извлечения функций для использования PySpark, как это требуется при использовании AWS Glue в качестве решения ETL. Наше решение может быть разработано и развернуто исключительно специалистом по анализу данных с использованием только SageMaker и не требует знания других решений ETL, таких как AWS Batch. Это может значительно сократить время, необходимое для развертывания конвейера машинного обучения (ML) в рабочей среде. Конвейер работает через Python и легко интегрируется с рабочими процессами извлечения признаков, что делает его адаптируемым к широкому спектру приложений для анализа данных.

Обзор решения

Обзор конвейера ETL с использованием SageMaker Processing

Трубопровод состоит из трех основных этапов:

  1. Используйте Обработка Amazon SageMaker задание для обработки необработанных файлов JSONL, связанных с указанным днем. Данные за несколько дней могут обрабатываться отдельными заданиями обработки одновременно.
  2. Используйте AWS Glue для сканирования данных после обработки данных за несколько дней.
  3. Загрузите обработанные функции для указанного диапазона дат с помощью SQL из таблицы Amazon Athena, затем обучите и разверните модель рекомендации заданий.

Обработка необработанных файлов JSONL

Мы обрабатываем необработанные файлы JSONL за указанный день с помощью задания обработки SageMaker. Задание реализует извлечение объектов и сжатие данных, а также сохраняет обработанные объекты в файлы Parquet с 1 миллионом записей в каждом файле. Мы используем преимущества распараллеливания ЦП для параллельного извлечения функций для каждого необработанного файла JSONL. Результаты обработки каждого файла JSONL сохраняются в отдельный файл Parquet во временной директории. После обработки всех файлов JSONL мы выполняем сжатие тысяч небольших файлов Parquet в несколько файлов по 1 миллиону записей в каждом. Сжатые файлы Parquet затем загружаются в Amazon S3 как результат выполнения задания по обработке. Сжатие данных обеспечивает эффективное сканирование и SQL-запросы на следующих этапах конвейера.

Ниже приведен пример кода для планирования задания обработки SageMaker на определенный день, например 01 января 2020 г., с использованием SageMaker SDK. Задание считывает необработанные файлы JSONL из Amazon S3 (например, из s3://bucket/raw-data/2020/01/01) и сохраняет сжатые файлы Parquet в Amazon S3 (например, для s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies 
%pip install sagemaker pyarrow s3fs awswrangler

import sagemaker
import boto3

from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput

region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket()

### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8  ### we use 8 process workers
date = "2020-01-01" ### process data for one day

est_cls = SKLearn
framework_version_str = "0.20.0"

### schedule processing job
script_processor = FrameworkProcessor(
    role=role,
    instance_count=1,
    instance_type=instance,
    estimator_cls=est_cls,
    framework_version=framework_version_str,
    volume_size_in_gb=500,
)

script_processor.run(
    code="processing_script.py", ### name of the main processing script
    source_dir="../src/etl/", ### location of source code directory

    ### our processing script loads raw jsonl files directly from S3
    ### this avoids long start-up times of the processing jobs,
    ### since raw data does not need to be copied into instance
    inputs=(), ### processing job input is empty

    outputs=(
        ProcessingOutput(destination="s3://bucket/processed/table-name/",
                         source="/opt/ml/processing/output"),
    ),
    arguments=(
        ### directory with job's output
        "--output", "/opt/ml/processing/output",

        ### temporary directory inside instance
        "--tmp_output", "/opt/ml/tmp_output",

        "--n_jobs", str(n_jobs), ### number of process workers
        "--date", date, ### date to process

        ### location with raw jsonl files in S3
        "--path", "s3://bucket/raw-data/",
    ),
    wait=False
)

Следующая схема кода для основного скрипта (processing_script.py), который запускает задание обработки SageMaker, выглядит следующим образом:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path

### function to process raw jsonl file and save extracted features into parquet file  
from process_data import process_jsonl

### parse command line arguments
args = parse_args()

### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-')))

### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}")

### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}")

### process individual jsonl files in parallel using n_jobs process workers
futures=()
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor:
    for file in jsons:
        inp_file = Path(file)
        out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet")
        ### process_jsonl function reads raw jsonl file from S3 location (inp_file)
        ### and saves result into parquet file (out_file) inside temporary directory
        futures.append(executor.submit(process_jsonl, file, out_file))

    ### wait until all jsonl files are processed
    for future in concurrent.futures.as_completed(futures):
        result = future.result()

### compact parquet files
dataset = ds.dataset(tmp_out)

if len(dataset.schema) > 0:
    ### save compacted parquet files with 1MM records per file
    ds.write_dataset(dataset, out_dir, format="parquet", 
                     max_rows_per_file=1024 * 1024)

Масштабируемость — ключевая особенность нашего конвейера. Во-первых, несколько заданий обработки SageMaker можно использовать для одновременной обработки данных в течение нескольких дней. Во-вторых, мы избегаем одновременной загрузки всех обработанных или необработанных данных в память при обработке данных каждого указанного дня. Это позволяет обрабатывать данные с использованием типов экземпляров, которые не могут разместить данные за полный день в основной памяти. Единственное требование состоит в том, что тип экземпляра должен быть способен одновременно загружать в память N необработанных файлов JSONL или обработанных файлов Parquet, где N — количество используемых рабочих процессов.

Сканирование обработанных данных с помощью AWS Glue

После обработки всех необработанных данных за несколько дней мы можем создать таблицу Athena из всего набора данных с помощью сканера AWS Glue. Мы используем AWS SDK для панд (awswrangler) библиотеку для создания таблицы, используя следующий фрагмент:

import awswrangler as wr

### crawl processed data in S3
res = wr.s3.store_parquet_metadata(
    path="s3://bucket/processed/table-name/",
    database="database_name",
    table="table_name",
    dataset=True,
    mode="overwrite",
    sampling=1.0,
    path_suffix='.parquet',
)

### print table schema
print(res(0))

Загрузка обработанных функций для обучения

Обработанные функции для указанного диапазона дат теперь можно загрузить из таблицы Athena с помощью SQL, а затем эти функции можно использовать для обучения модели рекомендации вакансий. Например, следующий фрагмент загружает обработанные функции за один месяц в DataFrame, используя метод awswrangler библиотека:

import awswrangler as wr

query = """
    SELECT * 
    FROM table_name
    WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' 
"""

### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database="database_name")

Кроме того, использование SQL для загрузки обработанных функций для обучения можно расширить, чтобы учесть различные другие варианты использования. Например, мы можем применить аналогичный конвейер для ведения двух отдельных таблиц Athena: одна для хранения показов пользователей, а другая — для хранения кликов пользователей по этим показам. Используя операторы объединения SQL, мы можем получить показы, на которые пользователи либо нажали, либо не нажали, а затем передать эти показы в задание по обучению модели.

Преимущества решения

Внедрение предлагаемого решения дает ряд преимуществ нашему существующему рабочему процессу, в том числе:

  • Упрощенная реализация – Решение позволяет реализовать извлечение функций в Python с использованием популярных библиотек машинного обучения. И для этого не требуется портировать код в PySpark. Это упрощает извлечение функций, поскольку в этом конвейере будет выполняться тот же код, который был разработан специалистом по данным в блокноте.
  • Быстрый путь к производству – Решение может быть разработано и развернуто специалистом по данным для выполнения масштабного извлечения признаков, что позволит им разработать рекомендательную модель ML на основе этих данных. В то же время то же решение может быть развернуто в производстве инженером по машинному обучению с небольшими изменениями.
  • Многоразовое использование – Решение предоставляет шаблон многократного использования для извлечения признаков в большом масштабе и может быть легко адаптировано для других случаев использования, помимо создания рекомендательных моделей.
  • Эффективность – Решение обеспечивает хорошую производительность: обработка одного дня Talent.comданные заняли менее 1 часа.
  • Дополнительные обновления – Решение также поддерживает инкрементные обновления. Новые ежедневные данные можно обрабатывать с помощью задания обработки SageMaker, а расположение S3, содержащее обработанные данные, можно повторно сканировать для обновления таблицы Athena. Мы также можем использовать задание cron для обновления сегодняшних данных несколько раз в день (например, каждые 3 часа).

Мы использовали этот конвейер ETL, чтобы помочь Talent.com обрабатывать 50 000 файлов в день, содержащих 5 миллионов записей, и создали обучающие данные, используя функции, извлеченные из необработанных данных за 90 дней с Talent.com — всего 450 миллионов записей в 900 000 файлов. Наш конвейер помог Talent.com создать и внедрить систему рекомендаций в производство всего за 2 недели. Решение выполнило все процессы машинного обучения, включая ETL, в Amazon SageMaker без использования других сервисов AWS. Система рекомендаций по вакансиям позволила увеличить рейтинг кликов в онлайн-A/B-тестировании на 8,6 % по сравнению с предыдущим решением на базе XGBoost, помогая миллионам пользователей Talent.com найти лучшие вакансии.

Заключение

В этом посте описывается конвейер ETL, который мы разработали для обработки функций для обучения и развертывания модели рекомендации вакансий на Talent.com. Наш конвейер использует задания обработки SageMaker для эффективной обработки данных и извлечения признаков в больших масштабах. Код извлечения функций реализован на Python, что позволяет использовать популярные библиотеки машинного обучения для извлечения функций в большом масштабе без необходимости портировать код для использования PySpark.

Мы призываем читателей изучить возможность использования конвейера, представленного в этом блоге, в качестве шаблона для сценариев использования, где требуется извлечение функций в большом масштабе. Специалист по данным может использовать конвейер для построения модели машинного обучения, а затем этот же конвейер может использовать инженер машинного обучения для запуска в рабочей среде. Это может значительно сократить время, необходимое для комплексной разработки решения ML, как это было в случае с Talent.com. Читатели могут обратиться к руководство по настройке и запуску заданий обработки SageMaker. Мы также рекомендуем читателям просмотреть публикацию «От текста к работе мечты: создание специалиста по рекомендации вакансий на основе НЛП на Talent.com с помощью Amazon SageMaker», где мы обсуждаем методы обучения модели глубокого обучения с использованием Amazon SageMaker для создания системы рекомендаций по работе Talent.com.


Об авторах

Дмитрий БеспаловДмитрий Беспалов — старший научный сотрудник в лаборатории решений машинного обучения Amazon, где он помогает клиентам AWS из разных отраслей ускорить внедрение искусственного интеллекта и облака.

И СянИ Сян является научным сотрудником второго уровня в лаборатории решений машинного обучения Amazon, где она помогает клиентам AWS в различных отраслях ускорить внедрение искусственного интеллекта и облачных технологий.

Тонг ВанТонг Ван — старший научный сотрудник в лаборатории решений машинного обучения Amazon, где он помогает клиентам AWS из разных отраслей ускорить внедрение искусственного интеллекта и облака.

Анатолий ХоменкоАнатолий Хоменко — старший инженер по машинному обучению в компании Talent.com со страстью к обработке естественного языка, подбирая хороших людей на хорошую работу.

Абденур БеззухАбденур Беззух — руководитель с более чем 25-летним опытом создания и предоставления технологических решений, которые масштабируются для миллионов клиентов. Абденур занимал должность технического директора (CTO) в Talent.com когда команда AWS разработала и реализовала это конкретное решение для Talent.com.

Яньцзюнь ЦиЯньцзюнь Ци — старший менеджер по прикладным наукам в лаборатории решений Amazon Machine Learning. Она внедряет инновации и применяет машинное обучение, чтобы помочь клиентам AWS ускорить внедрение искусственного интеллекта и облака.

LEAVE A REPLY

Please enter your comment!
Please enter your name here