MLOps Workshop

๐Ÿ”ง

Building ML Pipelines with Kubeflow

Create your first end-to-end machine learning pipeline using Kubeflow Pipelines 2.x

โฑ๏ธ 20 minutes ๐Ÿ“š Phase 3 of 7 ๐Ÿ› ๏ธ Hands-on

What You'll Learn

Build a complete ML pipeline that processes data, trains a model, and stores artifacts - all using modern Kubeflow Pipelines 2.x with native Kubernetes support.

๐Ÿš€

Quick Setup

Lightweight KFP 2.x installation in minutes

๐Ÿ”—

Visual Pipelines

Drag-and-drop interface for complex workflows

๐Ÿ“Š

Artifact Storage

Automatic model and data versioning

๐Ÿš€

Step 1: Setup & Installation

Install Kubeflow Pipelines 2.x with native Kubernetes support - no complex dependencies!

๐Ÿงฐ Prerequisites & Installation

๐Ÿš€ Kubeflow Pipelines 2.x Clean Setup
    
      # ๐Ÿงฐ 1๏ธโƒฃ Prerequisites
# Ensure you have:
kubectl version --client
# should be >= v1.24

python3 --version
# should be >= 3.8

# Install the Kubeflow Pipelines SDK:
pip install kfp==2.14.6

# โš™๏ธ 2๏ธโƒฃ Create a Fresh Namespace
kubectl create namespace kubeflow

# Verify:
kubectl get ns

# ๐Ÿ”ง 3๏ธโƒฃ Install cert-manager
# Kubeflow Pipelines uses cert-manager for webhook certificates
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.18.2/cert-manager.yaml
kubectl wait --for=condition=Ready pod -l app.kubernetes.io/instance=cert-manager -n cert-manager --timeout=300s

# ๐Ÿงฉ 4๏ธโƒฃ Install Kubeflow Pipelines (Native Mode)
# We'll install only the lightweight, Kubernetes-native version
export PIPELINE_VERSION=2.14.3

# Install CRDs (required cluster-wide definitions)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"

# Deploy Kubeflow Pipelines (native mode)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"

# Wait for pods to start:
kubectl get pods -n kubeflow -w

# ๐ŸŒ 5๏ธโƒฃ Access the Kubeflow Pipelines UI
kubectl port-forward svc/ml-pipeline-ui 8080:80 -n kubeflow

# Then open โ†’ http://localhost:8080
    
  
โ„น๏ธ โœ… Expected Pods

You should see these pods running:

โ€ข cache-server
โ€ข minio
โ€ข ml-pipeline-ui-deployment
โ€ข ml-pipeline-persistenceagent
โ€ข ml-pipeline-scheduledworkflow
โ€ข ml-pipeline-viewer-crd
โ€ข ml-pipeline-visualizationserver
โ€ข ml-pipeline-wapi
โ€ข workflow-controller
๐Ÿ”ง

Step 2: Build Your Pipeline

Create a taxi price prediction pipeline using modern Kubeflow 2.x syntax with Dataset and Model artifacts.

๐Ÿ Pipeline Code

Save this code as taxi_price_pipeline.py:

Pipeline Code (pipeline.py)
    
      from kfp import dsl
from kfp import compiler

# -------------------------
# COMPONENT 1: Download + Prepare Taxi Dataset
# -------------------------
@dsl.component(
    base_image='python:3.9',
    packages_to_install=['pandas', 'kaggle']
)
def prepare_taxi_data(output_csv: dsl.Output[dsl.Dataset]):
    import pandas as pd
    import os
    import random
    from zipfile import ZipFile

    # Ensure Kaggle API credentials are configured
    os.environ['KAGGLE_USERNAME'] = 'bhargavisatya27'
    os.environ['KAGGLE_KEY'] = '54897dd35575cfdeb71baf9a17c85b17'

    # Download dataset
    os.system("kaggle datasets download -d adelanseur/taxi-trips-chicago-2024 -p .")

    # Extract zip
    for file in os.listdir('.'):
        if file.endswith('.zip'):
            with ZipFile(file, 'r') as zip_ref:
                zip_ref.extractall('.')

    # Load the main data file (find .csv file automatically)
    csv_file = [f for f in os.listdir('.') if f.endswith('.csv')][0]
    df = pd.read_csv(csv_file)

    # Select relevant columns (adapt based on Kaggle column names)
    # Usually columns like: 'Pickup Community Area', 'Dropoff Community Area', 'Trip Total'
    # You may need to adjust these if the Kaggle file uses different names
    selected_cols = ['Pickup Community Area', 'Dropoff Community Area', 'Trip Total']
    df_small = df[selected_cols].rename(columns={
        'Pickup Community Area': 'start_location',
        'Dropoff Community Area': 'end_location',
        'Trip Total': 'price'
    })

    # Drop missing values and sample 2000 rows
    df_small = df_small.dropna().sample(5000, random_state=42)

    # Save to output dataset
    df_small.to_csv(output_csv.path, index=False)
    print(f"โœ… Prepared taxi dataset saved to: {output_csv.path}")

# -------------------------
# COMPONENT 2: Train Fare Model
# -------------------------
@dsl.component(
    base_image='python:3.9',
    packages_to_install=['pandas', 'scikit-learn', 'joblib']
)
def train_fare_model(dataset: dsl.Input[dsl.Dataset], model: dsl.Output[dsl.Model]):
    import pandas as pd
    import joblib
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline

    df = pd.read_csv(dataset.path)
    X = df[['start_location', 'end_location']]
    y = df['price']

    # One-hot encode start and end locations
    categorical_features = ['start_location', 'end_location']
    preprocessor = ColumnTransformer(
        transformers=[('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)]
    )

    # Random Forest Regressor pipeline
    model_pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('regressor', RandomForestRegressor(n_estimators=100, random_state=42))
    ])

    model_pipeline.fit(X, y)
    joblib.dump(model_pipeline, model.path)
    print(f"โœ… Model trained and saved to: {model.path}")

# -------------------------
# COMPONENT 3: Evaluate Model
# -------------------------
@dsl.component(
    base_image='python:3.9',
    packages_to_install=['pandas', 'scikit-learn', 'joblib']
)
def evaluate_model(dataset: dsl.Input[dsl.Dataset], model: dsl.Input[dsl.Model]) -> float:
    import pandas as pd
    import joblib
    from sklearn.metrics import r2_score

    df = pd.read_csv(dataset.path)
    X = df[['start_location', 'end_location']]
    y_true = df['price']

    model_pipeline = joblib.load(model.path)
    y_pred = model_pipeline.predict(X)
    r2 = r2_score(y_true, y_pred)

    print(f"โœ… Model Rยฒ score: {r2}")
    return float(r2)

# -------------------------
# PIPELINE DEFINITION
# -------------------------
@dsl.pipeline(
    name='chicago-taxi-fare-pipeline',
    description='Pipeline using Kaggle Chicago Taxi Trips 2024 to predict fare from start/end locations.'
)
def taxi_pipeline():
    data = prepare_taxi_data()
    model = train_fare_model(dataset=data.outputs['output_csv'])
    evaluate_model(dataset=data.outputs['output_csv'], model=model.outputs['model'])

# -------------------------
# COMPILE THE PIPELINE
# -------------------------
if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=taxi_pipeline,
        package_path='chicago_taxi_pipeline.yaml'
    )
    print("โœ… Pipeline compiled successfully: chicago_taxi_pipeline.yaml")

    
  

๐Ÿงฑ Compile to YAML

Compile Pipeline
    
      # Compile your Python pipeline to YAML:
python taxi_price_pipeline.py

# This generates:
# chicago_taxi_pipeline.yaml
    
  
๐Ÿš€

Step 3: Deploy & Run Pipeline

Deploy your pipeline to Kubeflow and run it using either the UI or CLI.

๐Ÿ–ฅ๏ธ Option A: Via UI

Upload and run through the web interface:

Deploy via UI
    
      # 1. Open Kubeflow Pipelines UI
# Go to: http://localhost:8080

# 2. Upload Pipeline
# Click "Upload pipeline" button
# Select your chicago_taxi_pipeline.yaml file
# Give it a name: "Taxi Price Prediction"

# 3. Create Run
# Click on your uploaded pipeline
# Click "Create Run" button
# Give it a name: "Taxi Price Run v1"
# Click "Start" to execute
    
  

๐Ÿ’ป Option B: Via CLI

Deploy directly using kubectl commands:

Deploy via CLI
    
      # Create the pipeline & run directly:
kubectl apply -f chicago_taxi_pipeline.yaml -n kubeflow

kubectl apply -f - <<EOF
apiVersion: pipelines.kubeflow.org/v2beta1
kind: PipelineRun
metadata:
  generateName: taxi-price-run-
  namespace: kubeflow
spec:
  pipelineRef:
    name: taxipricepipeline
EOF

# Verify run status:
kubectl get pipelineruns -n kubeflow
    
  
๐Ÿ“Š

Step 4: Monitor & Access Artifacts

Track your pipeline execution and access stored models and data.

๐Ÿ“Š Monitor Pipeline Execution

Monitor Commands
    
      # Check pipeline runs:
kubectl get pipelineruns -n kubeflow

# View specific run details:
kubectl describe pipelinerun <run-name> -n kubeflow

# Check pod logs:
kubectl logs -f <pod-name> -n kubeflow
    
  

๐Ÿ“‚ Access Stored Artifacts

KFP stores artifacts in MinIO (S3-compatible storage). Access your models and data:

Access MinIO Storage
    
      # Port-forward MinIO:
kubectl port-forward svc/minio 9000:9000 -n kubeflow

# Open http://localhost:9000
# Login credentials:
# Username: minio
# Password: minio123

# Navigate to mlpipeline bucket to see:
# mlpipeline/artifacts/<pipeline>/<run-id>/<component>/
    
  

๐Ÿ“Š What You'll Find:

Pipeline UI (localhost:8080):
  • โ€ข Visual pipeline diagram
  • โ€ข Real-time execution status
  • โ€ข Component-level logs
  • โ€ข Performance metrics
MinIO Storage (localhost:9000):
  • โ€ข Trained models (.pkl files)
  • โ€ข Processed datasets (.csv files)
  • โ€ข Artifact lineage tracking
  • โ€ข Version history
๐Ÿงน

Cleanup (Optional)

When you're done with the workshop, clean up resources to avoid charges.

๐Ÿงน Cleanup Commands
    
      # Delete everything:
kubectl delete namespace kubeflow cert-manager
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
    
  

โš ๏ธ Important:

  • โ€ข This will permanently delete your cluster and all data
  • โ€ข You'll need to recreate everything if you want to continue
  • โ€ข Only run this when you're completely done with the workshop

Ready for the Next Phase?

You've built your first ML pipeline! Now let's learn how to deploy and monitor your models in production.