Airflow Mlflow all the Flows

Classic Duo for Experimentation Tracking and Dags

Classic implmentation of mlops is definitely airflow and mlflow. Easy to create dags as well as utilizing mlflow model registry from data scientists who completed the model selection from experiments is ezy pzy

Create your docker composer file

version: '3'
services:
  mlflow:
    image: "mlflow/mlflow:latest"
    ports:
      - "5000:5000"
    environment:
      - MLFLOW_TRACKING_URI=http://mlflow:5000
      - MLFLOW_DEFAULT_ARTIFACT_ROOT=./mlruns
    volumes:
      - ./mlruns:/mlruns
    command: server
  airflow:
    image: "puckel/docker-airflow:latest"
    ports:
      - "8080:8080"
    environment:
      - LOAD_EXAMPLES=False
    volumes:
      - ./dags:/usr/local/airflow/dags

This file will create two Docker containers: one for MLflow and one for Airflow. The MLflow container will run on port 5000 and the Airflow container will run on port 8080. You will be able to access the MLflow UI at http://localhost:5000 and the Airflow UI at http://localhost:8080.

You can use this docker-compose.yml file to start the containers with the following command:

docker-compose up

This will start both Airflow and Mlflow with the port numbers assigned

I used airflow and mlflow pretty well so this is an example dag of how you can use the DAG to schedule a model deployment with airflow

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

import mlflow

def call_mlflow_model():
    # Get the latest production version of the model
    ## depending on mlflow version? 
    # model_version = client.search_model_versions(model_name, stages=["Production"]).latest_versions[0]
    latest_prod_version = mlflow.tracking.MlflowClient().get_latest_versions(
        "Production"
    )[0].version

    # Load the model from MLflow
    model = mlflow.pyfunc.load_model(f"models:/my_model/production/{latest_prod_version}")

    # Deploy the model
    # (Implementation of deployment will depend on your specific use case)
    model.deploy()
    # Return the model URI and flavor
    return model_uri, flavor

# Define the DAG
default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
}

dag = DAG(
    "mlflow-deploy",
    default_args=default_args,
    schedule_interval="@once",
)

# Define the task that gets the latest model version
get_latest_model_version_task = PythonOperator(
    task_id="get_latest_model_version",
    python_callable=call_mlflow_model,
    dag=dag,
)

The deployment part can be tailored to the business needs either for batch vs api. The right thing to do for either is to call a cloud compute to run the model in a docker image to segergate the environments. Airflow env /= ML deployment env