Building ML Pipelines with Kubeflow
Create your first end-to-end machine learning pipeline using Kubeflow Pipelines 2.x
What You'll Learn
Build a complete ML pipeline that processes data, trains a model, and stores artifacts - all using modern Kubeflow Pipelines 2.x with native Kubernetes support.
Quick Setup
Lightweight KFP 2.x installation in minutes
Visual Pipelines
Drag-and-drop interface for complex workflows
Artifact Storage
Automatic model and data versioning
Step 1: Setup & Installation
Install Kubeflow Pipelines 2.x with native Kubernetes support - no complex dependencies!
๐งฐ Prerequisites & Installation
# ๐งฐ 1๏ธโฃ Prerequisites
# Ensure you have:
kubectl version --client
# should be >= v1.24
python3 --version
# should be >= 3.8
# Install the Kubeflow Pipelines SDK:
pip install kfp==2.14.6
# โ๏ธ 2๏ธโฃ Create a Fresh Namespace
kubectl create namespace kubeflow
# Verify:
kubectl get ns
# ๐ง 3๏ธโฃ Install cert-manager
# Kubeflow Pipelines uses cert-manager for webhook certificates
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.18.2/cert-manager.yaml
kubectl wait --for=condition=Ready pod -l app.kubernetes.io/instance=cert-manager -n cert-manager --timeout=300s
# ๐งฉ 4๏ธโฃ Install Kubeflow Pipelines (Native Mode)
# We'll install only the lightweight, Kubernetes-native version
export PIPELINE_VERSION=2.14.3
# Install CRDs (required cluster-wide definitions)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
# Deploy Kubeflow Pipelines (native mode)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"
# Wait for pods to start:
kubectl get pods -n kubeflow -w
# ๐ 5๏ธโฃ Access the Kubeflow Pipelines UI
kubectl port-forward svc/ml-pipeline-ui 8080:80 -n kubeflow
# Then open โ http://localhost:8080
You should see these pods running:
Step 2: Build Your Pipeline
Create a taxi price prediction pipeline using modern Kubeflow 2.x syntax with Dataset and Model artifacts.
๐ Pipeline Code
Save this code as taxi_price_pipeline.py:
from kfp import dsl
from kfp import compiler
# -------------------------
# COMPONENT 1: Download + Prepare Taxi Dataset
# -------------------------
@dsl.component(
base_image='python:3.9',
packages_to_install=['pandas', 'kaggle']
)
def prepare_taxi_data(output_csv: dsl.Output[dsl.Dataset]):
import pandas as pd
import os
import random
from zipfile import ZipFile
# Ensure Kaggle API credentials are configured
os.environ['KAGGLE_USERNAME'] = 'bhargavisatya27'
os.environ['KAGGLE_KEY'] = '54897dd35575cfdeb71baf9a17c85b17'
# Download dataset
os.system("kaggle datasets download -d adelanseur/taxi-trips-chicago-2024 -p .")
# Extract zip
for file in os.listdir('.'):
if file.endswith('.zip'):
with ZipFile(file, 'r') as zip_ref:
zip_ref.extractall('.')
# Load the main data file (find .csv file automatically)
csv_file = [f for f in os.listdir('.') if f.endswith('.csv')][0]
df = pd.read_csv(csv_file)
# Select relevant columns (adapt based on Kaggle column names)
# Usually columns like: 'Pickup Community Area', 'Dropoff Community Area', 'Trip Total'
# You may need to adjust these if the Kaggle file uses different names
selected_cols = ['Pickup Community Area', 'Dropoff Community Area', 'Trip Total']
df_small = df[selected_cols].rename(columns={
'Pickup Community Area': 'start_location',
'Dropoff Community Area': 'end_location',
'Trip Total': 'price'
})
# Drop missing values and sample 2000 rows
df_small = df_small.dropna().sample(5000, random_state=42)
# Save to output dataset
df_small.to_csv(output_csv.path, index=False)
print(f"โ
Prepared taxi dataset saved to: {output_csv.path}")
# -------------------------
# COMPONENT 2: Train Fare Model
# -------------------------
@dsl.component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn', 'joblib']
)
def train_fare_model(dataset: dsl.Input[dsl.Dataset], model: dsl.Output[dsl.Model]):
import pandas as pd
import joblib
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
df = pd.read_csv(dataset.path)
X = df[['start_location', 'end_location']]
y = df['price']
# One-hot encode start and end locations
categorical_features = ['start_location', 'end_location']
preprocessor = ColumnTransformer(
transformers=[('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)]
)
# Random Forest Regressor pipeline
model_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('regressor', RandomForestRegressor(n_estimators=100, random_state=42))
])
model_pipeline.fit(X, y)
joblib.dump(model_pipeline, model.path)
print(f"โ
Model trained and saved to: {model.path}")
# -------------------------
# COMPONENT 3: Evaluate Model
# -------------------------
@dsl.component(
base_image='python:3.9',
packages_to_install=['pandas', 'scikit-learn', 'joblib']
)
def evaluate_model(dataset: dsl.Input[dsl.Dataset], model: dsl.Input[dsl.Model]) -> float:
import pandas as pd
import joblib
from sklearn.metrics import r2_score
df = pd.read_csv(dataset.path)
X = df[['start_location', 'end_location']]
y_true = df['price']
model_pipeline = joblib.load(model.path)
y_pred = model_pipeline.predict(X)
r2 = r2_score(y_true, y_pred)
print(f"โ
Model Rยฒ score: {r2}")
return float(r2)
# -------------------------
# PIPELINE DEFINITION
# -------------------------
@dsl.pipeline(
name='chicago-taxi-fare-pipeline',
description='Pipeline using Kaggle Chicago Taxi Trips 2024 to predict fare from start/end locations.'
)
def taxi_pipeline():
data = prepare_taxi_data()
model = train_fare_model(dataset=data.outputs['output_csv'])
evaluate_model(dataset=data.outputs['output_csv'], model=model.outputs['model'])
# -------------------------
# COMPILE THE PIPELINE
# -------------------------
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=taxi_pipeline,
package_path='chicago_taxi_pipeline.yaml'
)
print("โ
Pipeline compiled successfully: chicago_taxi_pipeline.yaml")
๐งฑ Compile to YAML
# Compile your Python pipeline to YAML:
python taxi_price_pipeline.py
# This generates:
# chicago_taxi_pipeline.yaml
Step 3: Deploy & Run Pipeline
Deploy your pipeline to Kubeflow and run it using either the UI or CLI.
๐ฅ๏ธ Option A: Via UI
Upload and run through the web interface:
# 1. Open Kubeflow Pipelines UI
# Go to: http://localhost:8080
# 2. Upload Pipeline
# Click "Upload pipeline" button
# Select your chicago_taxi_pipeline.yaml file
# Give it a name: "Taxi Price Prediction"
# 3. Create Run
# Click on your uploaded pipeline
# Click "Create Run" button
# Give it a name: "Taxi Price Run v1"
# Click "Start" to execute
๐ป Option B: Via CLI
Deploy directly using kubectl commands:
# Create the pipeline & run directly:
kubectl apply -f chicago_taxi_pipeline.yaml -n kubeflow
kubectl apply -f - <<EOF
apiVersion: pipelines.kubeflow.org/v2beta1
kind: PipelineRun
metadata:
generateName: taxi-price-run-
namespace: kubeflow
spec:
pipelineRef:
name: taxipricepipeline
EOF
# Verify run status:
kubectl get pipelineruns -n kubeflow
Step 4: Monitor & Access Artifacts
Track your pipeline execution and access stored models and data.
๐ Monitor Pipeline Execution
# Check pipeline runs:
kubectl get pipelineruns -n kubeflow
# View specific run details:
kubectl describe pipelinerun <run-name> -n kubeflow
# Check pod logs:
kubectl logs -f <pod-name> -n kubeflow
๐ Access Stored Artifacts
KFP stores artifacts in MinIO (S3-compatible storage). Access your models and data:
# Port-forward MinIO:
kubectl port-forward svc/minio 9000:9000 -n kubeflow
# Open http://localhost:9000
# Login credentials:
# Username: minio
# Password: minio123
# Navigate to mlpipeline bucket to see:
# mlpipeline/artifacts/<pipeline>/<run-id>/<component>/
๐ What You'll Find:
Pipeline UI (localhost:8080):
- โข Visual pipeline diagram
- โข Real-time execution status
- โข Component-level logs
- โข Performance metrics
MinIO Storage (localhost:9000):
- โข Trained models (.pkl files)
- โข Processed datasets (.csv files)
- โข Artifact lineage tracking
- โข Version history
Cleanup (Optional)
When you're done with the workshop, clean up resources to avoid charges.
# Delete everything:
kubectl delete namespace kubeflow cert-manager
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
โ ๏ธ Important:
- โข This will permanently delete your cluster and all data
- โข You'll need to recreate everything if you want to continue
- โข Only run this when you're completely done with the workshop
Ready for the Next Phase?
You've built your first ML pipeline! Now let's learn how to deploy and monitor your models in production.