MLOps Interview Questions 2026: 28 Answers with Code

What changed in 2026 drives
Mass-recruiter offer letters are flatter for 2026 batch - the 4-5 LPA ASE band has barely budged in three years while inflation eats real wages. Premium tracks (Digital, Pro, Elite, Specialist) are still where the differential lives, and they are entirely test-driven. If you are aiming higher than the default offer, the coding round is not optional pageantry - it is the entire interview.
What I'd actually study for this
- 01Two solid coding-round answers (1 medium-hard DSA each, with edge-case discussion) > five half-baked ones
- 02One real project you can defend end-to-end - file paths, design decisions, and what you would change
- 03One DBMS schema you actually built (not a textbook ER diagram), with at least 3 join-heavy queries written from memory
- 04Three behavioural STAR stories: failure recovered, conflict handled, ownership taken
Where most candidates trip up
The single biggest mistake is treating company-specific guides as primary prep and DSA as secondary. It is the opposite. Mass recruiters use the test as a filter, but premium tracks at every IT services company use coding to allocate offer band. Spend 70% of prep time on DSA + system fundamentals, 20% on company-specific patterns, 10% on HR rehearsal. Reverse that ratio and you collect the default offer.
Editorial commentary by Aditya Sharma · written for PapersAdda · not generated, not aggregated.
MLOps is the fastest-growing specialization at the intersection of ML and software engineering. Companies have moved from proof-of-concept models to production systems that require continuous training, monitoring, and deployment automation. Candidates report that MLOps roles at Databricks, Google, Amazon, and major Indian tech unicorns are among the best-compensated engineering positions in 2026. This guide covers 28 MLOps interview questions with full answers and production-grade code.
PapersAdda's take: MLOps interviews test whether you can operate an ML system, not just train a model. The questions are heavily scenario-based: "Your model's accuracy dropped 8% last week. Walk me through your debugging process." Prepare for those scenarios. According to candidate accounts from public preparation resources, monitoring and drift detection questions appear in over 70% of senior MLOps rounds. Confirm the specific tooling stack expected on the official company careers portal before preparing.
Related articles: Machine Learning Interview Questions 2026 | Deep Learning Interview Questions 2026 | Data Engineering Interview Questions 2026 | AWS Solutions Architect Interview 2026 | DevOps Engineer Interview Questions 2026
Which Companies Ask MLOps Questions?
| Company / Role | MLOps Focus |
|---|---|
| Databricks, Snowflake | ML pipeline orchestration, lakehouse integration |
| Google (GCP ML) | Vertex AI pipelines, TFX, model serving |
| Amazon (SageMaker) | Managed ML pipelines, endpoints, monitoring |
| Netflix, Spotify, Uber | Large-scale model serving, feature stores |
| Indian unicorns (Meesho, Swiggy, PhonePe) | Production recommendation and fraud ML |
EASY: Core MLOps Concepts (Questions 1-8)
Q1. What is MLOps? How does it differ from DevOps?
| Dimension | DevOps | MLOps |
|---|---|---|
| Artifact | Application code | Code + Data + Model |
| Testing | Unit, integration, E2E | + Data validation, model evaluation |
| Deployment | Rolling, blue-green | + Canary, shadow, A/B, champion-challenger |
| Monitoring | Latency, error rate | + Data drift, model degradation, bias |
| Trigger for redeployment | Code change | + Data drift, performance drop |
| Key challenge | Reproducibility of builds | Reproducibility of experiments |
MLOps extends DevOps to handle the unique properties of ML systems: data and model versioning, experiment tracking, and continuous evaluation.
Q2. What is experiment tracking? Implement an MLflow run.
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
import numpy as np
mlflow.set_tracking_uri("http://localhost:5000") # or use mlruns/ locally
mlflow.set_experiment("churn-prediction")
with mlflow.start_run(run_name="gb-v1"):
# Log hyperparameters
params = {'n_estimators': 500, 'learning_rate': 0.05, 'max_depth': 5}
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# Log metrics
val_auc = roc_auc_score(y_val, model.predict_proba(X_val)[:,1])
mlflow.log_metric("val_roc_auc", val_auc)
mlflow.log_metric("train_roc_auc",
roc_auc_score(y_train, model.predict_proba(X_train)[:,1]))
# Log model with signature
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(model, "model", signature=signature,
registered_model_name="churn-gb")
# Log artifacts (feature importance plot, etc.)
mlflow.log_artifact("feature_importance.png")
mlflow.log_dict({"features": list(X_train.columns)}, "features.json")
print(f"Val AUC: {val_auc:.4f}")
print(f"Run ID: {mlflow.active_run().info.run_id}")
Q3. What is a feature store? Why is it important?
- Training-serving skew: Features computed differently at training vs. serving time leads to subtle bugs
- Reusability: Feature engineering work is shared across teams and models
from feast import FeatureStore
from datetime import datetime
import pandas as pd
# Initialize store from feature_store.yaml
store = FeatureStore(repo_path="feature_repo/")
# Retrieve features for training (point-in-time correct joins)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"event_timestamp": [
datetime(2026, 1, 15), datetime(2026, 1, 20), datetime(2026, 1, 25)
]
})
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_spend_90d",
"user_stats:login_count_7d",
"product_features:avg_price_category"
]
).to_df()
# Serve features at inference time (low-latency)
feature_vector = store.get_online_features(
features=["user_stats:total_spend_90d", "user_stats:login_count_7d"],
entity_rows=[{"user_id": 1001}]
).to_dict()
Q4. What is model versioning? How do you manage the model registry?
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register a model
run_id = "abc123def456"
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "churn-predictor")
# Transition model stages
client.transition_model_version_stage(
name="churn-predictor",
version=3,
stage="Staging"
)
# Load model by stage
model = mlflow.sklearn.load_model("models:/churn-predictor/Staging")
# Compare models
latest_versions = client.get_latest_versions("churn-predictor", stages=["Production", "Staging"])
for v in latest_versions:
print(f"Version {v.version} ({v.current_stage}): {v.run_id}")
# Promote after validation
client.transition_model_version_stage(
name="churn-predictor",
version=3,
stage="Production",
archive_existing_versions=True # archive previous Production version
)
Q5. What is a CI/CD pipeline for ML? What stages does it include?
CI/CD for ML (unlike software CI/CD, must handle data + model + code):
Stage 1: Code validation
- Lint (flake8, black)
- Unit tests (pytest) for feature transforms, preprocessing
- Type checking (mypy)
Stage 2: Data validation (NEW vs software)
- Schema check (Great Expectations)
- Distribution drift vs reference stats
- Label quality checks
Stage 3: Model training (on small data slice for CI speed)
- Run training pipeline
- Check train metric above baseline threshold
Stage 4: Model evaluation
- Compare new model vs champion (current Production)
- Slice-based evaluation (performance by subgroup)
- Bias/fairness checks (Aequitas, Fairlearn)
Stage 5: Model packaging and staging deployment
- Package as Docker image
- Deploy to Staging (shadow mode or canary 1%)
- Load test serving endpoint
Stage 6: Production deployment
- Gradual traffic rollout (canary)
- Monitor for 30 minutes before full promotion
- Rollback trigger if metrics degrade
Q6. What are the deployment strategies for ML models?
| Strategy | How | Risk | Use When |
|---|---|---|---|
| Blue-green | Maintain two full environments; switch traffic | Low (instant rollback) | Stateless models, cost-insensitive |
| Canary | Route X% traffic to new model; increase gradually | Low | Default for production ML |
| Shadow | New model runs in parallel, receives same requests, results discarded | None | Testing high-stakes models |
| A/B test | Route user cohorts to different models; measure outcome | Medium (users see different quality) | Evaluating business impact |
| Champion-challenger | Production model (champion) vs new model (challenger) in A/B | Medium | Continuous model improvement |
import random
class ModelRouter:
def __init__(self, champion, challenger, challenger_fraction=0.1):
self.champion = champion
self.challenger = challenger
self.challenger_fraction = challenger_fraction
def predict(self, features):
if random.random() < self.challenger_fraction:
result = self.challenger.predict(features)
model_version = "challenger"
else:
result = self.champion.predict(features)
model_version = "champion"
# Log to metrics store for comparison
self._log(model_version, result)
return result
def _log(self, version, result):
pass # write to analytics DB for A/B comparison
Q7. What is data drift and how do you detect it?
| Type | What Shifts | Detection |
|---|---|---|
| Covariate shift | P(X) changes; P(Y | X) stays same |
| Label drift | P(Y) changes | Monitor prediction distribution |
| Concept drift | P(Y | X) changes |
| Upstream data drift | Schema or values change upstream | Schema + value range monitoring |
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from scipy.stats import ks_2samp, chi2_contingency
import pandas as pd
import numpy as np
# Evidently for automated drift reports
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=X_train_sample, current_data=X_production_sample)
report.save_html("drift_report.html")
# Manual KS test for continuous features
def check_continuous_drift(train_series, prod_series, alpha=0.05):
stat, pval = ks_2samp(train_series, prod_series)
return {'drift': pval < alpha, 'ks_stat': stat, 'p_value': pval}
# PSI (Population Stability Index) for model scores
def compute_psi(expected, actual, n_bins=10):
eps = 1e-10
bins = np.percentile(expected, np.linspace(0, 100, n_bins+1))
bins[0], bins[-1] = -np.inf, np.inf
exp_pct = np.histogram(expected, bins=bins)[0] / len(expected) + eps
act_pct = np.histogram(actual, bins=bins)[0] / len(actual) + eps
psi = np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct))
# PSI < 0.1: no drift; 0.1-0.2: moderate; > 0.2: major drift
return psi
Q8. How do you containerize an ML model for deployment?
# Dockerfile for ML model serving
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies first (Docker layer caching)
RUN apt-get update && apt-get install -y libgomp1 && rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app/ app/
COPY models/ models/
# Create non-root user
RUN useradd -m -u 1000 mluser && chown -R mluser:mluser /app
USER mluser
EXPOSE 8080
HEALTHCHECK --interval=30s --timeout=10s CMD curl -f http://localhost:8080/health || exit 1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]
# FastAPI serving application
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.sklearn
import numpy as np
app = FastAPI()
model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
probability: float
prediction: int
model_version: str = "production"
@app.get("/health")
def health():
return {"status": "healthy"}
@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
try:
x = np.array(request.features).reshape(1, -1)
prob = float(model.predict_proba(x)[0, 1])
return PredictionResponse(probability=prob, prediction=int(prob >= 0.5))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
MEDIUM: Production ML Systems (Questions 9-20)
Q9. How do you implement model monitoring in production?
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Prometheus metrics
PREDICTION_COUNT = Counter('ml_predictions_total', 'Total predictions', ['model', 'version'])
PREDICTION_LATENCY = Histogram('ml_prediction_latency_seconds', 'Prediction latency')
PREDICTION_SCORE_AVG = Gauge('ml_prediction_score_avg', 'Rolling average prediction score')
DATA_DRIFT_PSI = Gauge('ml_data_drift_psi', 'PSI for feature drift', ['feature'])
class MonitoredModel:
def __init__(self, model, model_name, version):
self.model = model
self.model_name = model_name
self.version = version
self.score_buffer = []
def predict(self, x):
start = time.time()
score = self.model.predict_proba(x)[:, 1]
elapsed = time.time() - start
# Record metrics
PREDICTION_COUNT.labels(self.model_name, self.version).inc(len(x))
PREDICTION_LATENCY.observe(elapsed)
# Rolling average score (track prediction distribution)
self.score_buffer.extend(score.tolist())
if len(self.score_buffer) >= 1000:
PREDICTION_SCORE_AVG.set(sum(self.score_buffer[-1000:]) / 1000)
return score
# Alerting thresholds
ALERT_THRESHOLDS = {
'accuracy_drop_pct': 5, # alert if accuracy drops > 5%
'drift_psi': 0.2, # alert if PSI > 0.2 (major drift)
'p99_latency_ms': 100, # alert if P99 latency > 100ms
'error_rate_pct': 0.1 # alert if >0.1% prediction errors
}
Q10. What is the training-serving skew and how do you prevent it?
Root causes:
- Transform code copy-pasted (diverges over time)
- Different libraries or versions
- Different data freshness
- Missing values handled differently
Prevention:
# The gold standard: use the SAME code path for training and serving
# Strategy 1: Feature store (same features for both)
# Strategy 2: Serialize preprocessing inside the model artifact
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
import joblib
# Bundle preprocessing and model together
preprocessor = ColumnTransformer([
('num', StandardScaler(), numerical_cols),
('cat', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1), cat_cols)
])
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('model', GradientBoostingClassifier(n_estimators=300))
])
full_pipeline.fit(X_train_raw, y_train) # raw input (no preprocessing outside pipeline)
joblib.dump(full_pipeline, 'pipeline.pkl')
# At serving time: pass raw features
pipeline = joblib.load('pipeline.pkl')
predictions = pipeline.predict(raw_features) # same preprocessing as training
# Audit: periodically compare training pipeline output with serving pipeline output
# using the same raw inputs
Q11. How do you design a model retraining pipeline?
Retraining pipeline design:
Trigger conditions (any of these):
1. Scheduled (weekly or monthly)
2. Drift detected (PSI > 0.2 or KS p-value < 0.01)
3. Performance drop (AUC drops > 5% vs baseline)
4. New labeled data available (batch labeling completed)
Pipeline steps:
1. Data ingestion (feature store pull, label join)
2. Data validation (Great Expectations schema + distribution)
3. Feature computation (run transform pipeline)
4. Model training (hyperparameter tuning if major drift, else fixed params)
5. Evaluation (compare to current production model on holdout)
6. Model registration (MLflow, version bump)
7. Staging deployment (shadow serving)
8. Promotion (if staging metrics pass thresholds)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Using Apache Beam for data ingestion + transformation
def run_training_pipeline(data_path, model_output_path, run_date):
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1'
])
with beam.Pipeline(options=options) as p:
data = (
p
| 'ReadData' >> beam.io.ReadFromParquet(data_path)
| 'FilterDate' >> beam.Filter(lambda row: row['date'] >= run_date)
| 'ComputeFeatures' >> beam.Map(compute_features)
| 'FilterValid' >> beam.Filter(validate_row)
)
# ... rest of pipeline
# Kubeflow pipeline (production standard in 2026)
import kfp
from kfp import dsl
@dsl.component(base_image='python:3.11', packages_to_install=['scikit-learn', 'mlflow'])
def train_model(data_path: str, model_output: dsl.Output[dsl.Model]):
import mlflow.sklearn
# ... training code
pass
@dsl.pipeline(name='churn-retraining')
def retraining_pipeline(data_path: str = 'gs://bucket/data/'):
train_task = train_model(data_path=data_path)
evaluate_task = evaluate_model(model=train_task.outputs['model_output'])
deploy_task = deploy_if_better(evaluation=evaluate_task.outputs['metrics'])
Q12. What is BentoML? How do you use it for model serving?
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON
# Save model to BentoML model store
bentoml.sklearn.save_model("churn_model", trained_model,
signatures={"predict": {"batchable": True}})
# Define service
runner = bentoml.sklearn.get("churn_model:latest").to_runner()
svc = bentoml.Service("churn_service", runners=[runner])
@svc.api(input=NumpyNdarray(dtype="float32", shape=(-1, 20)),
output=JSON())
async def predict(input_data: np.ndarray) -> dict:
probs = await runner.predict.async_run(input_data)
return {
"probabilities": probs[:, 1].tolist(),
"predictions": (probs[:, 1] >= 0.5).astype(int).tolist()
}
# Build and containerize
# bentoml build -> bentoml containerize churn_service:latest
Q13. How do you run load testing for an ML serving endpoint?
# Using Locust for ML endpoint load testing
from locust import HttpUser, task, between
import numpy as np
import json
class MLModelUser(HttpUser):
wait_time = between(0.05, 0.2) # 5-20ms between requests (simulate concurrent users)
def on_start(self):
# Pre-generate random feature vectors
self.feature_batch = np.random.randn(64, 20).astype('float32')
@task
def predict_single(self):
features = np.random.randn(1, 20).astype('float32')
response = self.client.post(
"/predict",
json={"features": features.tolist()},
headers={"Content-Type": "application/json"}
)
assert response.status_code == 200
@task(weight=3)
def predict_batch(self):
response = self.client.post(
"/predict",
json={"features": self.feature_batch.tolist()}
)
# Run: locust -f locustfile.py --host=http://model-api --users=100 --spawn-rate=10
# Targets for a production ML API:
# P50 latency: < 20ms
# P99 latency: < 100ms
# Throughput: > 1,000 RPS per instance
# Error rate: < 0.1%
Q14. What is an ML metadata store? What does it track?
| Entity | What Is Tracked |
|---|---|
| Data | Path, schema, statistics, splits, version |
| Feature | Computation code, source data, statistics |
| Model | Architecture, hyperparameters, training data version, metrics |
| Experiment | Parameters, metrics, code version (git SHA) |
| Deployment | Model version, deployment time, endpoint, traffic |
| Prediction | Input features, output scores, timestamp, model version |
from ml_metadata import metadata_store
from ml_metadata.proto import metadata_store_pb2 as mlmd_pb2
# Create metadata store
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = '/tmp/mlmd.db'
connection_config.sqlite.connection_mode = 3
store = metadata_store.MetadataStore(connection_config)
# Register artifact types
dataset_type = mlmd_pb2.ArtifactType()
dataset_type.name = "Dataset"
dataset_type_id = store.put_artifact_type(dataset_type)
# Record a training dataset artifact
dataset = mlmd_pb2.Artifact()
dataset.uri = "gs://bucket/train_data/2026-06-08/"
dataset.type_id = dataset_type_id
dataset_id = store.put_artifacts([dataset])[0]
Q15. How do you implement shadow deployment for safe model rollout?
import asyncio
import logging
from dataclasses import dataclass
from typing import Any
@dataclass
class PredictionResult:
score: float
latency_ms: float
model_version: str
class ShadowDeploymentProxy:
"""Routes requests to champion; concurrently fires shadow requests."""
def __init__(self, champion_model, shadow_model, shadow_log_path):
self.champion = champion_model
self.shadow = shadow_model
self.log_path = shadow_log_path
async def predict(self, features: Any) -> PredictionResult:
import time
# Champion prediction (synchronous, returned to caller)
t0 = time.perf_counter()
champion_score = self.champion.predict_proba([features])[0, 1]
champion_latency = (time.perf_counter() - t0) * 1000
# Shadow prediction (fire-and-forget, does not block caller)
asyncio.create_task(self._shadow_predict(features, champion_score))
return PredictionResult(
score=float(champion_score),
latency_ms=champion_latency,
model_version="champion"
)
async def _shadow_predict(self, features, champion_score):
import time
try:
t0 = time.perf_counter()
shadow_score = self.shadow.predict_proba([features])[0, 1]
shadow_latency = (time.perf_counter() - t0) * 1000
self._log_shadow(features, champion_score, shadow_score, shadow_latency)
except Exception as e:
logging.error(f"Shadow prediction failed: {e}")
def _log_shadow(self, features, champion, shadow, latency):
# Log to analytics for offline comparison
with open(self.log_path, 'a') as f:
import json
f.write(json.dumps({'champion': champion, 'shadow': shadow,
'latency': latency}) + '\n')
Q16. What is Triton Inference Server? When do you use it?
# Client call to Triton
import tritonclient.http as triton_http
import numpy as np
client = triton_http.InferenceServerClient(url="localhost:8000")
# Check model status
print(client.is_model_ready("my_model"))
# Prepare inputs
inputs = [triton_http.InferInput("input__0", [1, 224, 224, 3], "FP32")]
inputs[0].set_data_from_numpy(image_array.astype(np.float32))
# Run inference
outputs = [triton_http.InferRequestedOutput("output__0")]
response = client.infer("my_model", inputs, outputs=outputs)
predictions = response.as_numpy("output__0")
# Dynamic batching config (model config YAML):
# dynamic_batching {
# preferred_batch_size: [8, 16, 32]
# max_queue_delay_microseconds: 5000
# }
# When to use Triton:
# - Multi-model serving on single GPU
# - Mixed-framework serving (some PyTorch, some ONNX, some TensorRT)
# - High-throughput serving where batching efficiency matters
# - Ensemble models (chain multiple models)
Q17. How do you implement online feature computation for real-time serving?
import redis
import json
from datetime import datetime
class OnlineFeatureStore:
"""Redis-backed online feature store for sub-millisecond feature serving."""
def __init__(self, redis_host='localhost', redis_port=6379):
self.r = redis.Redis(host=redis_host, port=redis_port,
decode_responses=True, socket_connect_timeout=2)
def get_user_features(self, user_id: str) -> dict:
key = f"user_features:{user_id}"
raw = self.r.get(key)
if raw is None:
return self._compute_default_features(user_id)
return json.loads(raw)
def set_user_features(self, user_id: str, features: dict, ttl_seconds: int = 3600):
key = f"user_features:{user_id}"
self.r.setex(key, ttl_seconds, json.dumps(features))
def batch_get_features(self, user_ids: list) -> dict:
pipe = self.r.pipeline()
for uid in user_ids:
pipe.get(f"user_features:{uid}")
results = pipe.execute()
return {
uid: json.loads(r) if r else self._compute_default_features(uid)
for uid, r in zip(user_ids, results)
}
def _compute_default_features(self, user_id: str) -> dict:
return {"total_spend_90d": 0.0, "login_count_7d": 0, "days_since_signup": 0}
# Feature pipeline: Kafka consumer writes features to Redis
from confluent_kafka import Consumer
import json
def feature_update_consumer():
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'feature-updater',
'auto.offset.reset': 'latest'
})
consumer.subscribe(['user-events'])
store = OnlineFeatureStore()
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
event = json.loads(msg.value())
features = compute_incremental_features(event)
store.set_user_features(event['user_id'], features)
Q18. What is Great Expectations and how do you use it for data validation?
import great_expectations as gx
import pandas as pd
context = gx.get_context()
# Create expectation suite
suite = context.add_expectation_suite("training_data_suite")
# Define expectations
validator = context.get_validator(
datasource_name="my_postgres",
data_connector_name="default_inferred_data_connector",
data_asset_name="churn_features"
)
# Column existence
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("total_spend_90d")
# Data quality
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("total_spend_90d", min_value=0)
validator.expect_column_values_to_be_in_set("country", ["IN", "US", "GB", "DE"])
# Distribution (statistical)
validator.expect_column_mean_to_be_between("total_spend_90d", min_value=100, max_value=5000)
validator.expect_column_stdev_to_be_between("total_spend_90d", min_value=50)
# Run validation
results = validator.validate()
if not results["success"]:
failed_expectations = [r for r in results["results"] if not r["success"]]
for r in failed_expectations:
print(f"FAILED: {r['expectation_config']['expectation_type']}")
raise ValueError(f"Data validation failed: {len(failed_expectations)} expectations failed")
Q19. What is ONNX and how does it enable cross-framework deployment?
import torch
import onnx
import onnxruntime as ort
import numpy as np
# Export PyTorch model to ONNX
model.eval()
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=17,
do_constant_folding=True # folds constant sub-expressions at export time
)
# Validate ONNX model
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)
# Run with ONNX Runtime (CPU or CUDA)
ort_session = ort.InferenceSession("model.onnx",
providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
input_name = ort_session.get_inputs()[0].name
outputs = ort_session.run(None, {input_name: dummy_input.numpy()})
print(f"ONNX output shape: {outputs[0].shape}")
# ONNX Runtime speedup vs PyTorch eager: often 1.5-2x for inference
# TensorRT (NVIDIA): additional 2-4x via hardware-specific optimization
Q20. How do you handle model rollback in production?
# Model version management with instant rollback
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
def rollback_model(model_name: str, target_version: int):
"""Roll back to a specific model version."""
# Get current production version
current_prod = client.get_latest_versions(model_name, stages=["Production"])
current_version = current_prod[0].version if current_prod else None
print(f"Rolling back {model_name} from v{current_version} to v{target_version}")
# Archive current production
if current_version:
client.transition_model_version_stage(
name=model_name,
version=current_version,
stage="Archived"
)
# Promote target version to production
client.transition_model_version_stage(
name=model_name,
version=target_version,
stage="Production"
)
print(f"Rollback complete: v{target_version} is now Production")
# Automatic rollback based on metrics
class AutoRollback:
def __init__(self, model_name, alert_threshold_drop=0.05):
self.model_name = model_name
self.threshold = alert_threshold_drop
self.baseline_auc = None
def check_and_rollback(self, current_auc: float, previous_version: int):
if self.baseline_auc is None:
self.baseline_auc = current_auc
return
drop = (self.baseline_auc - current_auc) / self.baseline_auc
if drop > self.threshold:
print(f"AUC dropped {drop:.1%}. Triggering rollback.")
rollback_model(self.model_name, previous_version)
HARD: Advanced MLOps (Questions 21-28)
Q21. How do you implement real-time feature computation with sub-10ms latency?
Architecture for sub-10ms feature serving:
Layer 1: Redis hot cache (< 0.5ms)
- Precomputed features for active users
- TTL: 1-24 hours depending on feature freshness requirement
- Typical hit rate: 80-95% for recommendation/personalization
Layer 2: In-memory feature compute (1-5ms)
- Lightweight compute on request payload
- Example: "time since last login" from request metadata
- No network call needed
Layer 3: Feature store API (5-15ms)
- Feast or Tecton serving endpoint
- Fall through if Redis cache miss
Layer 4: Default/fallback features (< 0.1ms)
- Use population median values
- For new users with no history
import asyncio
import redis.asyncio as aioredis
class AsyncFeatureStore:
def __init__(self):
self.redis = None
async def setup(self):
self.redis = await aioredis.from_url("redis://localhost:6379")
async def get_features_parallel(self, user_id: str, item_id: str) -> dict:
# Fetch user and item features concurrently
user_key = f"user:{user_id}"
item_key = f"item:{item_id}"
user_raw, item_raw = await asyncio.gather(
self.redis.get(user_key),
self.redis.get(item_key)
)
import json
user_feats = json.loads(user_raw) if user_raw else self._default_user()
item_feats = json.loads(item_raw) if item_raw else self._default_item()
return {**user_feats, **item_feats}
def _default_user(self): return {"age": 30, "spend_90d": 0, "logins_7d": 1}
def _default_item(self): return {"avg_price": 500, "category_id": 0}
Q22. What is continuous training (CT) and how is it different from continuous delivery (CD)?
| Pipeline | Trigger | Artifact | Goal |
|---|---|---|---|
| Continuous Integration (CI) | Code change | Tested code | Catch bugs early |
| Continuous Delivery (CD) | Merge to main | Deployable build | Fast, reliable deploys |
| Continuous Training (CT) | New data or drift | Trained model | Keep model fresh |
| Continuous Evaluation (CE) | New labels | Performance metrics | Know if model is working |
# Continuous training pipeline with Prefect
from prefect import flow, task
from prefect.schedules import CronSchedule
@task(retries=3, retry_delay_seconds=60)
def fetch_new_training_data(since_date: str) -> str:
# Pull labeled data from warehouse
return "gs://bucket/training_data/"
@task
def validate_data(data_path: str) -> bool:
# Run Great Expectations suite
return True
@task
def train_model(data_path: str) -> str:
# Train and return MLflow run ID
return "run_abc123"
@task
def evaluate_model(run_id: str) -> float:
# Return validation AUC
return 0.87
@task
def deploy_if_better(run_id: str, new_auc: float, min_improvement: float = 0.001):
import mlflow
client = mlflow.MlflowClient()
# Compare to production model
# ... deploy logic ...
pass
@flow(schedule=CronSchedule(cron="0 2 * * 1")) # every Monday 2 AM
def weekly_retraining_flow():
data_path = fetch_new_training_data(since_date="last_week")
if validate_data(data_path):
run_id = train_model(data_path)
auc = evaluate_model(run_id)
deploy_if_better(run_id, auc)
Q23. How do you implement slice-based evaluation for fairness?
import pandas as pd
import numpy as np
from sklearn.metrics import roc_auc_score, f1_score
def slice_evaluation(model, X_test, y_test, slice_columns: list, min_slice_size=50):
"""
Evaluate model performance on data slices (demographic groups, regions, etc.)
Surfaces groups where model underperforms.
"""
results = []
y_scores = model.predict_proba(X_test)[:, 1]
overall_auc = roc_auc_score(y_test, y_scores)
for col in slice_columns:
for value in X_test[col].unique():
mask = X_test[col] == value
n = mask.sum()
if n < min_slice_size:
continue
slice_auc = roc_auc_score(y_test[mask], y_scores[mask])
disparity = slice_auc - overall_auc # negative = underperformance
results.append({
'slice_col': col,
'slice_value': value,
'n': n,
'auc': round(slice_auc, 4),
'disparity_vs_overall': round(disparity, 4),
'flag': disparity < -0.05 # flag if > 5% drop vs overall
})
df = pd.DataFrame(results).sort_values('auc')
flagged = df[df['flag']]
if len(flagged) > 0:
print(f"WARNING: {len(flagged)} underperforming slices detected:")
print(flagged.to_string())
return df
# Typical slices to evaluate:
# - Gender, age_group, state/region (for credit/churn)
# - Device type, operating system (for recommendation)
# - Business size, industry (for B2B models)
Q24. What is the Google ML Test Score and what does a mature ML system look like?
| Area | Tests | What Is Checked |
|---|---|---|
| Features and data | Data schema, range, distribution | No silent data failures |
| Model development | Offline evaluation, training performance | Reproducibility, benchmarks |
| ML infrastructure | Training pipeline, serving pipeline | Can retrain and redeploy reliably |
| Monitoring | Alerting, dashboards, staleness | Know when model is failing |
Mature ML system checklist:
Data:
[x] Feature distributions monitored continuously
[x] Schema validation on every data ingestion
[x] Training-serving skew check (automated comparison)
[x] Data lineage tracked (what data trained which model)
Model:
[x] Offline metrics regression test (new model must beat baseline)
[x] Slice-based evaluation (no hidden underperformance on subgroups)
[x] Reproducible training (same code + data = same model)
[x] Unit tests for preprocessing transforms
Infrastructure:
[x] Model can be retrained in < 4 hours (data to deployed endpoint)
[x] Rollback to any previous version in < 5 minutes
[x] Canary deployment with automatic abort on metric degradation
[x] Load test for 2x expected peak traffic
Monitoring:
[x] Prediction latency P50/P99 alerts
[x] Feature drift alerts (PSI, KS test)
[x] Model performance alerts (accuracy, AUC)
[x] Upstream data freshness alerts
Q25. How do you implement model explainability at scale?
import shap
import numpy as np
import json
from pathlib import Path
class ScalableExplainer:
"""SHAP-based explainer with caching for high-throughput serving."""
def __init__(self, model, background_data, n_background=500):
self.explainer = shap.TreeExplainer(model,
data=background_data[:n_background])
self.cache = {}
def explain(self, x: np.ndarray, request_id: str = None, cache=True) -> dict:
# SHAP values for a single prediction
shap_values = self.explainer.shap_values(x)
if isinstance(shap_values, list):
# Multi-class: use class 1 for binary classification
sv = shap_values[1]
else:
sv = shap_values
explanation = {
'base_value': float(self.explainer.expected_value
if np.isscalar(self.explainer.expected_value)
else self.explainer.expected_value[1]),
'feature_contributions': {
f'feature_{i}': float(sv[0, i]) for i in range(sv.shape[1])
},
'top_3_drivers': self._top_drivers(sv[0])
}
return explanation
def _top_drivers(self, shap_row: np.ndarray) -> list:
idx = np.argsort(np.abs(shap_row))[::-1][:3]
return [{'feature_idx': int(i), 'contribution': float(shap_row[i])} for i in idx]
Q26. What is the difference between batch scoring and real-time scoring architectures?
| Property | Batch Scoring | Real-time Scoring |
|---|---|---|
| Trigger | Scheduled (hourly/daily) | User request (on-demand) |
| Latency | Minutes to hours | < 100ms |
| Throughput | Millions of records | 100-10,000 RPS per server |
| Freshness | Stale by schedule interval | Fresh per request |
| Infrastructure | Spark, Databricks, BigQuery | FastAPI, Triton, BentoML |
| Feature freshness | Precomputed batch features OK | Online features required |
| Cost | Low (scheduled compute) | Higher (always-on servers) |
# Batch scoring with Spark
from pyspark.sql import SparkSession
import mlflow.spark
spark = SparkSession.builder.appName("BatchScoring").getOrCreate()
# Load trained model from MLflow
model_uri = "models:/churn-predictor/Production"
model = mlflow.spark.load_model(model_uri)
# Score 10M users in batch
users_df = spark.read.parquet("s3://data/users/features/2026-06-08/")
predictions = model.transform(users_df)
predictions.select("user_id", "prediction", "probability").write.parquet(
"s3://data/predictions/churn/2026-06-08/"
)
# Real-time: see serving examples throughout this guide
Q27. How do you implement A/B testing for ML models with statistical rigor?
import numpy as np
from scipy.stats import ttest_ind, chi2_contingency
from scipy.stats import mannwhitneyu
import pandas as pd
class MLABTest:
def __init__(self, control_data: pd.DataFrame, treatment_data: pd.DataFrame):
self.control = control_data
self.treatment = treatment_data
def test_conversion_rate(self, alpha=0.05) -> dict:
"""Two-proportion z-test for binary outcomes."""
from statsmodels.stats.proportion import proportions_ztest
count = np.array([
self.treatment['converted'].sum(),
self.control['converted'].sum()
])
nobs = np.array([len(self.treatment), len(self.control)])
stat, pval = proportions_ztest(count, nobs)
control_rate = self.control['converted'].mean()
treatment_rate = self.treatment['converted'].mean()
lift = (treatment_rate - control_rate) / control_rate
return {
'control_rate': round(control_rate, 4),
'treatment_rate': round(treatment_rate, 4),
'lift_pct': round(lift * 100, 2),
'p_value': round(pval, 4),
'significant': pval < alpha,
'sample_sizes': {'control': len(self.control), 'treatment': len(self.treatment)}
}
def sequential_test(self, alpha=0.05, max_n=50000) -> dict:
"""Sequential testing (avoids peeking problem with fixed-horizon tests)."""
# SPRT (Sequential Probability Ratio Test) approach
# Allows stopping early when sufficient evidence accumulates
# Use when you cannot afford to wait for full sample
pass
# Power analysis (before running the test)
from statsmodels.stats.power import NormalIndPower
effect_size = 0.05 # detect 5% relative lift in conversion
alpha = 0.05
power = 0.80
analysis = NormalIndPower()
required_n = analysis.solve_power(effect_size=effect_size,
alpha=alpha, power=power, alternative='larger')
print(f"Required sample size per arm: {int(required_n)}")
Q28. Design a complete ML platform for an e-commerce company.
ML Platform Architecture for E-commerce (2026):
Data Layer:
- Event streaming: Kafka (clickstream, purchases, search queries)
- Data lake: S3/GCS with Delta Lake format (version-controlled tables)
- Data warehouse: Snowflake or BigQuery (analytics, feature computation)
- Feature store: Feast (offline: Snowflake, online: Redis)
Training Layer:
- Experiment tracking: MLflow (hosted on MLflow tracking server)
- Training orchestration: Kubeflow Pipelines on Kubernetes
- Compute: GPU clusters for deep learning; Spark on Databricks for tabular
- HPO: Optuna (Bayesian) with distributed workers
Serving Layer:
- REST API: FastAPI with uvicorn, containerized on Kubernetes
- Batch scoring: Spark on Databricks, results to S3
- Model server: Triton Inference Server for GPU-accelerated serving
- CDN/caching: Varnish or CloudFront for latency reduction
MLOps Layer:
- CI/CD: GitHub Actions -> Docker build -> Kubernetes deploy
- Model registry: MLflow Model Registry (champion-challenger management)
- Monitoring: Prometheus + Grafana (latency, throughput, drift)
- Alerting: PagerDuty integration for P0 model degradation
- Rollback: Automated rollback triggered by monitoring alerts
Key ML models:
- Recommendation: Two-tower model, updated hourly
- Search ranking: LightGBM with semantic features, updated daily
- Fraud detection: LightGBM + GNN, real-time scoring
- Churn prediction: XGBoost, weekly batch scoring
- Pricing optimization: Contextual bandit, real-time
MLOps Tools Reference 2026
| Category | Tools | Notes |
|---|---|---|
| Experiment Tracking | MLflow, Weights and Biases | MLflow for self-hosted; W&B for teams |
| Pipeline Orchestration | Kubeflow, Apache Airflow, Prefect | Kubeflow for K8s-native; Prefect for simplicity |
| Feature Store | Feast, Tecton, Hopsworks | Feast open-source; Tecton enterprise |
| Model Serving | Triton, BentoML, TorchServe, Ray Serve | Triton for GPU; BentoML for simplicity |
| Model Monitoring | Evidently, Arize, WhyLabs | Evidently for open-source |
| Data Validation | Great Expectations, Pandera | Great Expectations more mature |
| Container Orchestration | Kubernetes | Required for all production ML |
FAQ
Q: What is the difference between DevOps and MLOps engineers? A: A DevOps engineer focuses on application code deployment, CI/CD, and infrastructure. An MLOps engineer additionally manages data pipelines, model training pipelines, model versioning, and production model monitoring. The roles overlap significantly; many MLOps engineers come from DevOps backgrounds.
Q: Is Kubernetes required for MLOps? A: For production at scale, yes. Most MLOps platforms (Kubeflow, Seldon, KServe) are Kubernetes-native. Understand pods, deployments, services, ConfigMaps, horizontal pod autoscaling, and resource limits.
Q: What is the most common MLOps failure mode? A: Training-serving skew is the most insidious. Models perform well in offline evaluation but degrade in production because features are computed differently. The fix is to use a feature store or serialize preprocessing inside the model artifact.
Related articles on PapersAdda:
Methodology applied to this articlelast verified 8 Jun 2026
- No fabricated salary numbers or success rates. If we quote a range, it's sourced.
- No noun-substituted templates. This article was not generated by swapping company names in a stock prompt.
- No paid placements, sponsored coaching links, or affiliate-shilled course pushes.
Explore this topic cluster
More resources in Interview Questions
Use the category hub to browse similar questions, exam patterns, salary guides, and preparation resources related to this topic.
Paid contributor programme
Sat this this year? Share your story, earn ₹500.
First-person experience reports help future candidates prep smarter. We pay verified contributors ₹500 via UPI per accepted story - with byline.
Submit your story →Ready to practice?
Take a free timed mock test
Put what you learned into practice. Our mock tests match the 2026 pattern with timer, navigator, reveal, and score breakdown. No signup.
Start Free Mock Test →Related Articles
Airbnb Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing Airbnb's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical, behavioural,...
Airtel Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing Airtel's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical, behavioural,...
AMD Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing AMD's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical, behavioural,...
Atlassian Interview Questions 2026: Top Tech, HR & Behavioural Q&As for Freshers
Clearing Atlassian's fresher loop in 2026 comes down to preparing for the exact mix of questions across technical,...
Barclays Interview Questions 2026
_Last verified by [Aditya Sharma](/author/aditya-sharma/) · cross-checked against PapersAdda Hiring Pulse and...
More from PapersAdda
Accenture Interview Questions 2026 (with Answers for Freshers)
Capgemini Interview Questions 2026 (with Answers for Freshers)
HCLTech Interview Questions 2026 (TechBee + TGT, with Answers)
IBM Interview Questions 2026 (with Answers for Freshers)