Airflow Mlflow all the Flows
Classic Duo for Experimentation Tracking and Dags
Classic implmentation of mlops is definitely airflow and mlflow. Easy to create dags as well as utilizing mlflow model registry from data scientists who completed the model selection from experiments is ezy pzy
Create your docker composer file
version: '3'
services:
mlflow:
image: "mlflow/mlflow:latest"
ports:
- "5000:5000"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- MLFLOW_DEFAULT_ARTIFACT_ROOT=./mlruns
volumes:
- ./mlruns:/mlruns
command: server
airflow:
image: "puckel/docker-airflow:latest"
ports:
- "8080:8080"
environment:
- LOAD_EXAMPLES=False
volumes:
- ./dags:/usr/local/airflow/dags
This file will create two Docker containers: one for MLflow and one for Airflow. The MLflow container will run on port 5000 and the Airflow container will run on port 8080. You will be able to access the MLflow UI at http://localhost:5000 and the Airflow UI at http://localhost:8080.
You can use this docker-compose.yml file to start the containers with the following command:
docker-compose up
This will start both Airflow and Mlflow with the port numbers assigned
I used airflow and mlflow pretty well so this is an example dag of how you can use the DAG to schedule a model deployment with airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import mlflow
def call_mlflow_model():
# Get the latest production version of the model
## depending on mlflow version?
# model_version = client.search_model_versions(model_name, stages=["Production"]).latest_versions[0]
latest_prod_version = mlflow.tracking.MlflowClient().get_latest_versions(
"Production"
)[0].version
# Load the model from MLflow
model = mlflow.pyfunc.load_model(f"models:/my_model/production/{latest_prod_version}")
# Deploy the model
# (Implementation of deployment will depend on your specific use case)
model.deploy()
# Return the model URI and flavor
return model_uri, flavor
# Define the DAG
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
}
dag = DAG(
"mlflow-deploy",
default_args=default_args,
schedule_interval="@once",
)
# Define the task that gets the latest model version
get_latest_model_version_task = PythonOperator(
task_id="get_latest_model_version",
python_callable=call_mlflow_model,
dag=dag,
)
The deployment part can be tailored to the business needs either for batch vs api. The right thing to do for either is to call a cloud compute to run the model in a docker image to segergate the environments. Airflow env /= ML deployment env