5.1. Orquestre um projeto de ciência de dados em Python com Prefect#

5.1.1. O que é Prefect?#

Prefect é um framework de código aberto para criar fluxos de trabalho em Python. O Prefect facilita a criação, execução e monitoramento de pipelines de dados em escala.

Para instalar o Prefect, digite:

pip install prefect

Se estiver usando o Poetry, então digite:

poetry add prefect

5.1.2. Construa seu fluxo de trabalho com Prefect#

Para saber como o Prefect funciona, vamos encapsular o fluxo de trabalho apresentado acima.

5.1.2.1. Primeiro Passo — Criar Tarefas#

Uma Task é uma ação discreta em um fluxo. Comece transformando as funções definidas acima em tarefas usando o decorador @task :

from prefect import task
from typing import Any, Dict, List
import pandas as pd

@task
def load_data(path: str) -> pd.DataFrame:
    ...

@task
def get_classes(data: pd.DataFrame, target_col: str) -> List[str]:
    """Função que retorna as classes do conjunto de dados Iris."""
    ...

@task
def encode_categorical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:
    """Função que codifica as colunas categóricas do conjunto de dados Iris."""
    ...

@task
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
    """Função para dividir o conjunto de dados Iris em conjuntos de treino e teste."""
    ...

5.1.2.2. Segunda Etapa — Criar um Fluxo#

Um Flow representa todo o fluxo de trabalho gerenciando as dependências entre as tarefas. Para criar um flow, basta inserir o código para executar suas funções dentro do gerenciador de contexto with Flow(...).

from prefect import task, Flow

with Flow("data-engineer") as flow:
    
    # Defina os parâmetros
    target_col = 'species'
    test_data_ratio = 0.2
    
    # Defina as tarefas
    data = load_data(path="data/raw/iris.csv")
    classes = get_classes(data=data, target_col=target_col) 
    categorical_columns = encode_categorical_columns(data=data, target_col=target_col)
    train_test_dict = split_data(data=categorical_columns, test_data_ratio=test_data_ratio, classes=classes)

Observe que nenhuma dessas tarefas é executada ao executar o código acima. O Prefect permite que você execute o flow imediatamente ou agende para mais tarde.

Vamos tentar executar o flow imediatamente usando flow.run() :

with Flow("data-engineer") as flow:
  # Defina seu fluxo aqui
  ...
  
flow.run()

A execução do código acima fornecerá uma saída semelhante a esta:

└── 15:49:46 | INFO    | Beginning Flow run for 'data-engineer'
└── 15:49:46 | INFO    | Task 'target_col': Starting task run...
└── 15:49:46 | INFO    | Task 'target_col': Finished task run for task with final state: 'Success'
└── 15:49:46 | INFO    | Task 'test_data_ratio': Starting task run...
└── 15:49:47 | INFO    | Task 'test_data_ratio': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'load_data': Starting task run...
└── 15:49:47 | INFO    | Task 'load_data': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'encode_categorical_columns': Starting task run...
└── 15:49:47 | INFO    | Task 'encode_categorical_columns': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'get_classes': Starting task run...
└── 15:49:47 | INFO    | Task 'get_classes': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'split_data': Starting task run...
└── 15:49:47 | INFO    | Task 'split_data': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!

Para entender o fluxo de trabalho criado pelo Prefect, vamos visualizar todo o fluxo de trabalho.

Comece instalando prefect[viz]:

pip install "prefect[viz]"

Em seguida, adicione o método visualize ao código:

flow.visualize()

E você deve ver a visualização do fluxo de trabalho do data-engineer como abaixo:

Observe que o Prefect gerencia automaticamente as ordens de execução entre as tarefas para que o fluxo de trabalho seja otimizado. Isso é muito legal para alguns pedaços de código adicionais!

5.1.2.3. Terceiro passo — Adicionar parâmetros#

Se você está experimentando frequentemente valores diferentes de uma variável, é ideal transformar essa variável em um Parameter:

test_data_ratio = 0.2
train_test_dict = split_data(data=categorical_columns, 
                            test_data_ratio=test_data_ratio, 
                            classes=classes)

Você pode considerar um Parameter como uma Task , exceto que ele pode receber entradas do usuário sempre que um fluxo for executado. Para transformar uma variável em um parâmetro, basta usar Parameter:

from prefect import task, Flow, Parameter 

test_data_ratio = Parameter("test_data_ratio", default=0.2)

train_test_dict = split_data(data=categorical_columns, 
                            test_data_ratio=test_data_ratio, 
                            classes=classes)

O primeiro argumento de Parameter especifica o nome do parâmetro. default é um argumento opcional que especifica o valor padrão do parâmetro.

Executar flow.visualize novamente nos dará uma saída como abaixo:

Você pode substituir o parâmetro padrão para cada execução:

  • adicionando o argumento parameters a flow.run():

$ flow.run(parameters={'test_data_ratio': 0.3})
  • ou usando o Prefect CLI:

$ prefect run -p data_engineering.py --param test_data_ratio=0.2 
  • ou usando um arquivo JSON:

$ prefect run -p data_engineering.py --param-file='params.json'

Seu arquivo JSON deve parecer com isso:

{"test_data_ratio": 0.3}

Você também pode alterar os parâmetros para cada execução usando o Prefect Cloud, que será apresentado na próxima seção.

5.1.3. Monitore seu fluxo de trabalho#

5.1.3.1. Visão geral#

O Prefect também permite monitorar seu fluxo de trabalho no Prefect Cloud. Siga esta instrução para instalar dependências relevantes para Prefect Cloud.

Depois que todas as dependências estiverem instaladas e configuradas, comece criando um projeto no Prefect executando:

$ prefect create project "Iris Project"

Em seguida, inicie um agente local para implantar nossos fluxos localmente em uma única máquina:

$ prefect agent local start

Em seguida, no final do seu arquivo, adicione:

flow.register(project_name="Iris Project")

Você deve ver algo parecido com o abaixo:

Flow URL: https://cloud.prefect.io/khuyentran1476-gmail-com-s-account/flow/dba26bea-8827-4db4-9988-3289f6cb662f
 └── ID: 2944dc36-cdac-4079-8497-be4ec5594785
 └── Project: Iris Project
 └── Labels: ['khuyen-Precision-7740']

Clique na URL na saída e você será redirecionado para uma página de visão geral. A página Visão geral mostra a versão do seu fluxo, quando ele é criado, o histórico de execução do fluxo e seu resumo de execuções:

Você também pode visualizar o resumo de outras execuções, quando são executadas e suas configurações:

É muito legal como essas informações importantes são rastreadas automaticamente pelo Prefect!

5.1.3.2. Executar o fluxo de trabalho com parâmetros padrão#

Observe que o fluxo de trabalho está registrado no Prefect Cloud, mas ainda não foi executado. Para executar o fluxo de trabalho com os parâmetros padrão, clique em Quick Run no canto superior direito:

Clique na execução que é criada. Agora você poderá ver a atividade do seu novo fluxo executado em tempo real!

5.1.3.3. Execute o fluxo de trabalho com parâmetros personalizados#

Para executar o fluxo de trabalho com parâmetros personalizados, clique na guia Run e altere os parâmetros em Inputs:

Quando estiver satisfeito com os parâmetros, basta clicar no botão Run para iniciar a execução.

5.1.3.4. Visualize o gráfico do fluxo de trabalho#

Clicar em Schematic fornecerá o gráfico de todo o fluxo de trabalho:

5.1.4. Outras características#

Além de alguns recursos básicos mencionados acima, o Prefect também fornece outros recursos interessantes que aumentarão significativamente a eficiência do seu fluxo de trabalho.

5.1.4.1. Cache de entrada#

Lembra do problema que mencionamos no início da seção? Normalmente, se a função get_classes falhar, os dados criados pela função encode_categorical_columns serão descartados e todo o fluxo de trabalho precisa ser iniciado desde o início.

No entanto, com o Prefect, a saída de encode_categorical_columns é armazenada. Na próxima vez que o fluxo de trabalho for executado novamente, a saída de encode_categorical_columns será usada pela próxima tarefa sem executar novamente a tarefa encode_categorical_columns:

Isso pode resultar em uma grande redução no tempo necessário para executar o fluxo de trabalho.

5.1.4.2. Saída persistente#

Às vezes, você pode querer exportar os dados da sua tarefa para um local externo. Isso pode ser feito inserindo na função de tarefa o código para salvar os dados.

def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
  
  X_train, X_test, y_train, y_test = ...
  
  import pickle
  pickle.save(...)

No entanto, fazer isso dificultará o teste da função.

O Prefect facilita salvar a saída de uma tarefa para cada execução:

  • definindo o checkpoint para True

$ export PREFECT__FLOWS__CHECKPOINTING=true
  • e adicionando result = LocalResult(dir=...)) para o decorator @task.

from prefect.engine.results import LocalResult

@task(result = LocalResult(dir='data/processed'))
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
    """Função para dividir o conjunto de dados Iris em conjuntos de treino e teste."""
    X_train, X_test, y_train, y_test = ...
    
    return dict(
        train_x=X_train,
        train_y=y_train,
        test_x=X_test,
        test_y=y_test,

Agora a saída da tarefa split_data será salva no diretório data/processed ! O nome será algo semelhante a isto:

prefect-result-2021-11-06t15-37-29-605869-00-00

Se você quiser personalizar o nome do seu arquivo, você pode adicionar o argumento target a @task :


from prefect.engine.results import LocalResult

@task(target="{date:%a_%b_%d_%Y_%H:%M:%S}/{task_name}_output", 
      result = LocalResult(dir='data/processed'))
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
    """Função para dividir o conjunto de dados Iris em conjuntos de treino e teste."""
    ...

O Prefect também fornece outras classes Result como GCSResult e S3Result. Você pode conferir a documentação da API sobre resultados aqui.

5.1.4.3. Use a saída de outro fluxo para o fluxo atual#

Se você estiver trabalhando com vários fluxos, por exemplo, fluxo data-engineer e fluxo data-science, convém usar a saída do fluxo data-engineer para o fluxo data-science.

Após salvar a saída do seu fluxo data-engineer como um arquivo, você pode ler esse arquivo usando o método read:

from prefect.engine.results import LocalResult

train_test_dict = LocalResult(dir=...).read(location=...).value

5.1.4.4. Conectar fluxos dependentes#

Imagine este cenário: você criou dois fluxos que dependem um do outro. O fluxo data-engineer precisa ser executado antes do fluxo data-science

Alguém que olhou para o seu fluxo de trabalho não entendeu a relação entre esses dois fluxos. Como resultado, eles executaram o fluxo data-science e o fluxo data-engineer ao mesmo tempo, e encontraram um erro!

Para evitar que isso aconteça, devemos especificar a relação entre os fluxos. Felizmente, O Prefect facilita para fazermos isso.

Comece pegando dois fluxos diferentes usando StartFlowRun. Adicione wait=True ao argumento para que o fluxo seguinte seja executado somente depois que o fluxo anterior terminar de ser executado.

from prefect import Flow 
from prefect.tasks.prefect import StartFlowRun

data_engineering_flow = StartFlowRun(flow_name="data-engineer", 
                                    project_name='Iris Project',
                                    wait=True)
                                    
data_science_flow = StartFlowRun(flow_name="data-science", 
                                project_name='Iris Project',
                                wait=True)

Em seguida, chamando data_science_flow no gerenciador de contexto with Flow(...). Use upstream_tasks para especificar as tarefas/fluxos que serão executados antes que o fluxo data-science seja executado.

with Flow("main-flow") as flow:
    result = data_science_flow(upstream_tasks=[data_engineering_flow])
    
flow.run()

Agora os dois fluxos estão conectados como abaixo:

Muito legal!

5.1.4.5. Agende seu fluxo#

O Prefect também facilita a execução de um fluxo em um determinado momento ou em um determinado intervalo.

Por exemplo, para executar um fluxo a cada 1 minuto, você pode iniciar a classe IntervalSchedule e adicionar schedule ao gerenciador de contexto with Flow(...):

from prefect.schedules import IntervalSchedule

schedule = IntervalSchedule(
    start_date=datetime.utcnow() + timedelta(seconds=1),
    interval=timedelta(minutes=1),
)

data_engineering_flow = ...
data_science_flow = ...


with Flow("main-flow", schedule=schedule) as flow:
    data_science = data_science_flow(upstream_tasks=[data_engineering_flow])

Agora seu fluxo será executado novamente a cada 1 minuto!

Saiba mais sobre as diferentes maneiras de programar seu fluxo aqui.

5.1.4.6. Logging#

Você pode registrar as declarações do print dentro de uma tarefa simplesmente adicionando log_stdout=True a @task:

@task(log_stdout=True)
def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:

  target = ... 
  accuracy = ...
    
  print(f"Model accuracy on test set: {round(accuracy * 100, 2)}")

E você deve ver uma saída como abaixo ao executar a tarefa:

[2021-11-06 11:41:16-0500] INFO - prefect.TaskRunner | Model accuracy on test set: 93.33