Home Машинное обучение Рекомендации и шаблоны проектирования для построения рабочих процессов машинного обучения с помощью Amazon SageMaker Pipelines | DeepTech

Рекомендации и шаблоны проектирования для построения рабочих процессов машинного обучения с помощью Amazon SageMaker Pipelines | DeepTech

0
Рекомендации и шаблоны проектирования для построения рабочих процессов машинного обучения с помощью Amazon SageMaker Pipelines
 | DeepTech

Amazon SageMaker Pipelines — это полностью управляемый сервис AWS для создания и управления рабочими процессами машинного обучения (ML). SageMaker Pipelines предлагает разработчикам приложений машинного обучения возможность управлять различными этапами рабочего процесса машинного обучения, включая загрузку данных, преобразование данных, обучение, настройку и развертывание. Вы можете использовать SageMaker Pipelines для организации заданий машинного обучения в SageMaker, а его интеграция с более крупной экосистемой AWS также позволяет вам использовать такие ресурсы, как функции AWS Lambda, задания Amazon EMR и многое другое. Это позволяет вам создать настраиваемый и воспроизводимый конвейер для конкретных требований ваших рабочих процессов машинного обучения.

В этом посте мы предлагаем некоторые лучшие практики, позволяющие максимизировать ценность SageMaker Pipelines и сделать процесс разработки более простым. Мы также обсуждаем некоторые распространенные сценарии и шаблоны проектирования при построении конвейеров SageMaker и приводим примеры их решения.

Лучшие практики для конвейеров SageMaker

В этом разделе мы обсудим некоторые лучшие практики, которым можно следовать при разработке рабочих процессов с использованием SageMaker Pipelines. Их внедрение может улучшить процесс разработки и упростить оперативное управление SageMaker Pipelines.

Используйте Pipeline Session для отложенной загрузки конвейера

Конвейерная сессия включает отложенную инициализацию ресурсов конвейера (задания не запускаются до выполнения конвейера). PipelineSession контекст наследует Сессия SageMaker и реализует удобные методы взаимодействия с другими объектами и ресурсами SageMaker, такими как учебные задания, конечные точки, входные наборы данных в Amazon Simple Storage Service (Amazon S3) и т. д. При определении конвейеров SageMaker вам следует использовать PipelineSession в рамках обычного сеанса SageMaker:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge’,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

Запускайте конвейеры в локальном режиме для экономичных и быстрых итераций во время разработки.

Вы можете запустить конвейер в локальном режиме, используя команду LocalPipelineSession контекст. В этом режиме конвейер и задания выполняются локально с использованием ресурсов на локальном компьютере, а не ресурсов, управляемых SageMaker. Локальный режим обеспечивает экономичный способ итерации кода конвейера с меньшим подмножеством данных. После локального тестирования конвейера его можно масштабировать для работы с помощью PipelineSession контекст.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=local_pipeline_session,
)

Управление конвейером SageMaker посредством управления версиями

Управление версиями артефактов и определений конвейеров является общим требованием жизненного цикла разработки. Вы можете создать несколько версий конвейера, назвав объекты конвейера уникальным префиксом или суффиксом, наиболее распространенным из которых является отметка времени, как показано в следующем коде:

from sagemaker.workflow.pipeline_context import PipelineSession
import time

current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
    name=pipeline_name,
    steps=(step_process, step_train, step_eval, step_cond),
    sagemaker_session=pipeline_session,
)

Организация и отслеживание выполнения конвейера SageMaker посредством интеграции с SageMaker Experiments.

SageMaker Pipelines можно легко интегрировать с SageMaker Experiments для организации и отслеживания работы конвейера. Это достигается за счет указания Конфигурация КонвейераЭксперимента во время создания объект трубопровода. С помощью этого объекта конфигурации вы можете указать имя эксперимента и имя испытания. Детали запуска конвейера SageMaker организуются в соответствии с указанным экспериментом и пробной версией. Если вы не укажете имя эксперимента явно, в качестве имени эксперимента будет использоваться имя конвейера. Аналогично, если вы явно не укажете имя пробной версии, в качестве имени пробной версии или группы запуска будет использоваться идентификатор запуска конвейера. См. следующий код:

Pipeline(
    name="MyPipeline",
    parameters=(...),
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name = ExecutionVariables.PIPELINE_NAME,
        trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID
        ),
    steps=(...)
)

Безопасно запускайте конвейеры SageMaker в частном VPC.

Чтобы защитить рабочие нагрузки машинного обучения, рекомендуется развертывать задания, организованные SageMaker Pipelines, в конфигурации безопасной сети в частном VPC, частных подсетях и группах безопасности. Чтобы обеспечить и обеспечить использование этой безопасной среды, вы можете реализовать следующую политику AWS Identity and Access Management (IAM) для роли исполнения SageMaker (эту роль принимает на себя конвейер во время его работы). Вы также можете добавить политику для запуска заданий, организованных SageMaker Pipelines, в режиме сетевой изоляции.

# IAM Policy to enforce execution within a private VPC

{

    "Action": (

        "sagemaker:CreateProcessingJob",
        "sagemaker:CreateTrainingJob",
        "sagemaker:CreateModel"
    ),

    "Resource": "*",
    "Effect": "Deny",
    "Condition": {
        "Null": {
            "sagemaker:VpcSubnets": "true"
        }
    }
}

# IAM Policy to enforce execution in network isolation mode
{

    "Version": "2012-10-17",
    "Statement": (
        {
            "Effect": "Deny",
            "Action": (
                "sagemaker:Create*"
            ),
            "Resource": "*",
            "Condition": {
                "StringNotEqualsIfExists": {
                    "sagemaker:NetworkIsolation": "true"
                }
            }
        }
    )
}

Пример реализации конвейера с этими элементами управления безопасностью см. Управление заданиями, регистрация моделей и непрерывное развертывание с помощью Amazon SageMaker в безопасной среде.

Отслеживайте стоимость прогонов конвейера с помощью тегов

Использование конвейеров SageMaker само по себе бесплатно; вы платите за вычислительные ресурсы и ресурсы хранения, которые вы запускаете в рамках отдельных этапов конвейера, таких как обработка, обучение и пакетный вывод. Чтобы агрегировать затраты на запуск конвейера, вы можете включать теги на каждый шаг конвейера, на котором создается ресурс. Затем на эти теги можно ссылаться в обозревателе затрат для фильтрации и агрегирования общей стоимости работы конвейера, как показано в следующем примере:

sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    tags=({'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'})
)

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    ...
)

В обозревателе затрат теперь можно получить стоимость, отфильтрованную по тегу:

response = client.get_cost_and_usage(
    TimePeriod={
        'Start': '2023-07-01',
        'End': '2023-07-15'
        },
    Metrics=('BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'),
    Granularity='MONTHLY',
    Filter={
        'Dimensions': {
            'Key':'USAGE_TYPE',
            'Values': (
                ‘SageMaker:Pipeline’
            )
        },
        'Tags': {
            'Key': 'keyName',
            'Values': (
                'keyValue',
                )
        }
    }
)

Шаблоны проектирования для некоторых распространенных сценариев

В этом разделе мы обсуждаем шаблоны проектирования для некоторых распространенных случаев использования конвейеров SageMaker.

Запустите облегченную функцию Python, используя шаг Lambda.

Функции Python повсеместно присутствуют в рабочих процессах машинного обучения; они используются при предварительной и постобработке, оценке и т. д. Lambda — это бессерверная вычислительная служба, которая позволяет запускать код без подготовки серверов и управления ими. С помощью Lambda вы можете запускать код на предпочитаемом вами языке, включая Python. Вы можете использовать это для запуска собственного кода Python как части вашего конвейера. Шаг Lambda позволяет запускать функции Lambda как часть конвейера SageMaker. Начните со следующего кода:

%%writefile lambdafunc.py

import json

def lambda_handler(event, context):
    str1 = event("str1")
    str2 = event("str2")
    str3 = str1 + str2
    return {
        "str3": str3
    }

Создайте функцию Lambda, используя Помощник Lambda для SageMaker Python SDK:

from sagemaker.lambda_helper import Lambda

def create_lambda(function_name, script, handler):
    response = Lambda(
        function_name=function_name,
        execution_role_arn=role,
        script= script,
        handler=handler,
        timeout=600,
        memory_size=10240,
    ).upsert()

    function_arn = response('FunctionArn')
    return function_arn

fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Вызовите шаг Lambda:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)

str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)

# Lambda Step
step_lambda1 = LambdaStep(
    name="LambdaStep1",
    lambda_func=Lambda(
        function_arn=fn_arn
    ),
    inputs={
        "str1": "Hello",
        "str2": " World"
    },
    outputs=(str3),
)

Передача данных между шагами

Входные данные для шага конвейера — это либо доступное расположение данных, либо данные, созданные одним из предыдущих шагов конвейера. Вы можете предоставить эту информацию в виде ProcessingInput параметр. Давайте рассмотрим несколько сценариев использования ProcessingInput.

Сценарий 1. Передайте выходные данные (примитивные типы данных) шага Lambda на шаг обработки.

Примитивные типы данных относятся к скалярным типам данных, таким как строка, целое число, логическое значение и число с плавающей запятой.

Следующий фрагмент кода определяет лямбда-функцию, которая возвращает словарь переменных с примитивными типами данных. Код вашей функции Lambda вернет JSON пар ключ-значение при вызове на этапе Lambda в конвейере SageMaker.

def handler(event, context):
    ...
    return {
        "output1": "string_value",
        "output2": 1,
        "output3": True,
        "output4": 2.0,
    }

Затем в определении конвейера вы можете определить параметры конвейера SageMaker, относящиеся к определенному типу данных, и установить переменную для вывода функции Lambda:

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor

role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

# 1. Define the output params of the Lambda Step

str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)

# 2. Lambda step invoking the lambda function and returns the Output

step_lambda = LambdaStep(
    name="MyLambdaStep",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda",
        session=PipelineSession(),
        ),
    inputs={"arg1": "foo", "arg2": "foo1"},
    outputs=(
        str_outputParam, int_outputParam, bool_outputParam, float_outputParam
        ),
)

# 3. Extract the output of the Lambda

str_outputParam = step_lambda.properties.Outputs("output1")

# 4. Use it in a subsequent step. For ex. Processing step

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    sagemaker_session=pipeline_session,
    role=role
)

processor_args = sklearn_processor.run(
    code="code/preprocess.py", #python script to run
    arguments=("--input-args", str_outputParam)
)

step_process = ProcessingStep(
    name="processstep1",
    step_args=processor_args,
)

Сценарий 2. Передайте выходные данные (непримитивные типы данных) шага Lambda на шаг обработки.

Непримитивные типы данных относятся к нескалярным типам данных (например, NamedTuple). У вас может возникнуть сценарий, когда вам нужно вернуть непримитивный тип данных из лямбда-функции. Для этого вам необходимо преобразовать непримитивный тип данных в строку:

# Lambda function code returning a non primitive data type

from collections import namedtuple

def lambda_handler(event, context):
    Outputs = namedtuple("Outputs", "sample_output")
    named_tuple = Outputs(
                    (
                        {'output1': 1, 'output2': 2},
                        {'output3': 'foo', 'output4': 'foo1'}
                    )
                )
return{
    "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input”

output_ref = step_lambda.properties.Outputs("named_tuple_string")

Затем вы можете использовать эту строку в качестве входных данных для следующего шага конвейера. Чтобы использовать именованный кортеж в коде, используйте eval() для анализа выражения Python в строке:

# Decipher the string in your processing logic code

import argparse
from collections import namedtuple

Outputs = namedtuple("Outputs", "sample_output")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--named_tuple_string", type=str, required=True)
    args = parser.parse_args()
    #use eval to obtain the named tuple from the string
    named_tuple = eval(args.named_tuple_string)

Сценарий 3. Передайте выходные данные шага через файл свойств.

Вы также можете сохранить выходные данные шага обработки в файле JSON свойств для дальнейшего использования в ConditionStep или другие ProcessingStep. Вы можете использовать Функция JSONGet запросить файл свойств. См. следующий код:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(

                outputs=(
                    ProcessingOutput(
                        output_name="hyperparam",
                        source="/opt/ml/processing/evaluation"
                    ),
                ),
            code="./local/preprocess.py",
            arguments=("--input-data", "s3://my-input"),
)

# 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile(
    name="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

Предположим, что содержимое файла свойств было следующим:

{
    "hyperparam": {
        "eta": {
            "value": 0.6
        }
    }
}

В этом случае можно запросить определенное значение и использовать его на последующих шагах с помощью функции JsonGet:

# 3. Query the property file
eta = JsonGet(
    step_name=step_process.name,
    property_file=hyperparam_report,
    json_path="hyperparam.eta.value",
)

Параметризация переменной в определении конвейера

Часто желательно параметризовать переменные, чтобы их можно было использовать во время выполнения, например, для создания URI S3. Вы можете параметризовать строку так, чтобы она оценивалась во время выполнения, используя Join функция. В следующем фрагменте кода показано, как определить переменную с помощью Join и используйте ее для установки местоположения вывода на этапе обработки:

# define the variable to store the s3 URI
s3_location = Join(
    on="/", 
    values=(
        "s3:/",
        ParameterString(
            name="MyBucket", 
            default_value=""
        ),
        "training",
        ExecutionVariables.PIPELINE_EXECUTION_ID
    )
)

# define the processing step
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

# use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run(
    outputs=(
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=s3_location,
        ),
    ),
    code="code/preprocess.py"
)

step_process = ProcessingStep(
    name="PreprocessingJob”,
    step_args=processor_run_args,
)

Запуск параллельного кода над итерируемым объектом

Некоторые рабочие процессы машинного обучения выполняют код параллельно в циклах for над статическим набором элементов ( повторяемый). Это может быть либо один и тот же код, который запускается с разными данными, либо другой фрагмент кода, который необходимо запускать для каждого элемента. Например, если у вас очень большое количество строк в файле и вы хотите ускорить время обработки, вы можете положиться на первый шаблон. Если вы хотите выполнить различные преобразования для определенных подгрупп данных, вам, возможно, придется запустить отдельный фрагмент кода для каждой подгруппы данных. Следующие два сценария иллюстрируют, как можно спроектировать конвейеры SageMaker для этой цели.

Сценарий 1. Реализация логики обработки различных частей данных.

Вы можете запустить задание обработки с несколькими экземплярами (установив instance_count до значения больше 1). При этом входные данные из Amazon S3 распределяются по всем экземплярам обработки. Затем вы можете использовать сценарий (process.py) для работы с определенной частью данных на основе номера экземпляра и соответствующего элемента в списке элементов. Логика программирования в файлеprocess.py может быть написана таким образом, что в зависимости от списка элементов, которые он обрабатывает, запускается другой модуль или фрагмент кода. В следующем примере определяется процессор, который можно использовать на этапе обработки:

sklearn_processor = FrameworkProcessor(
    estimator_cls=sagemaker.sklearn.estimator.SKLearn,
    framework_version="0.23-1",
    instance_type="ml.m5.4xlarge",
    instance_count=4, #number of parallel executions / instances
    base_job_name="parallel-step",
    sagemaker_session=session,
    role=role,
)

step_args = sklearn_processor.run(
    code="process.py",
    arguments=(
        "--items", 
        list_of_items, #data structure containing a list of items
        inputs=(
            ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
                    destination="/opt/ml/processing/input"
            )
        ),
    )
)

Сценарий 2. Выполнение последовательности шагов.

Если у вас есть последовательность шагов, которые необходимо выполнять параллельно, вы можете определить каждую последовательность как независимый конвейер SageMaker. Запуск этих конвейеров SageMaker затем можно запустить с помощью функции Lambda, которая является частью LambdaStep в родительском конвейере. Следующий фрагмент кода иллюстрирует сценарий, в котором запускаются два разных запуска конвейера SageMaker:

import boto3
def lambda_handler(event, context):
    items = (1, 2)
    #sagemaker client
    sm_client = boto3.client("sagemaker")
    
    #name of the pipeline that needs to be triggered.
    #if there are multiple, you can fetch available pipelines using boto3 api
    #and trigger the appropriate one based on your logic.
    pipeline_name="child-pipeline-1"

    #trigger pipeline for every item
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
    pipeline_name="child-pipeline-2"
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
return

Заключение

В этом посте мы обсудили некоторые лучшие практики эффективного использования и обслуживания конвейеров SageMaker. Мы также предоставили определенные шаблоны, которые вы можете использовать при разработке рабочих процессов с помощью SageMaker Pipelines независимо от того, создаете ли вы новые конвейеры или переносите рабочие процессы машинного обучения из других инструментов оркестрации. Чтобы начать работу с SageMaker Pipelines для оркестрации рабочих процессов ML, см. примеры кода на GitHub и конвейеры построения моделей Amazon SageMaker.


Об авторах

Пинак Паниграхи работает с клиентами над созданием решений на основе машинного обучения для решения стратегических бизнес-задач на AWS. Когда он не занят машинным обучением, его можно найти в походе, читающим книгу или смотрящим спортивные состязания.

Минакшисундарам Тандавараян работает в AWS специалистом по AI/ML. У него есть страсть к проектированию, созданию и продвижению ориентированных на человека данных и аналитики. Мина специализируется на разработке устойчивых систем, которые обеспечивают измеримые конкурентные преимущества стратегическим клиентам AWS. Мина является связующим звеном, дизайнерским мыслителем и стремится вывести бизнес на новые способы работы посредством инноваций, инкубации и демократизации.

LEAVE A REPLY

Please enter your comment!
Please enter your name here