issue 117apr 27mmxxvi
est. 2017
Sun, 27 Apr 2026
vol. IX · no. 117
PapersAdda
placement intelligence, since 2017
640+ briefs · 24 campuses · by reservation
verified offers · sourced from r/developersIndia
razorpay₹65.00 LPA· iit-d · sde-1google₹54.00 LPA· iiit-h · swe-imicrosoft₹49.50 LPA· iit-b · sdeatlassian₹38.00 LPA· nit-w · sde-1amazon₹44.20 LPA· bits-p · sde-1uber₹42.00 LPA· iit-kgp · sde-1razorpay₹65.00 LPA· iit-d · sde-1google₹54.00 LPA· iiit-h · swe-imicrosoft₹49.50 LPA· iit-b · sdeatlassian₹38.00 LPA· nit-w · sde-1amazon₹44.20 LPA· bits-p · sde-1uber₹42.00 LPA· iit-kgp · sde-1

MLOps Interview Questions 2026: 28 Answers with Code

26 min read
Interview Questions
Updated: 8 Jun 2026
Aditya Sharma
Aditya's Edit

PapersAdda 2026 Placement Cycle

By Aditya Sharma·Founder & Editor, PapersAdda

What changed in 2026 drives

Mass-recruiter offer letters are flatter for 2026 batch - the 4-5 LPA ASE band has barely budged in three years while inflation eats real wages. Premium tracks (Digital, Pro, Elite, Specialist) are still where the differential lives, and they are entirely test-driven. If you are aiming higher than the default offer, the coding round is not optional pageantry - it is the entire interview.

What I'd actually study for this

  • 01Two solid coding-round answers (1 medium-hard DSA each, with edge-case discussion) > five half-baked ones
  • 02One real project you can defend end-to-end - file paths, design decisions, and what you would change
  • 03One DBMS schema you actually built (not a textbook ER diagram), with at least 3 join-heavy queries written from memory
  • 04Three behavioural STAR stories: failure recovered, conflict handled, ownership taken

Where most candidates trip up

The single biggest mistake is treating company-specific guides as primary prep and DSA as secondary. It is the opposite. Mass recruiters use the test as a filter, but premium tracks at every IT services company use coding to allocate offer band. Spend 70% of prep time on DSA + system fundamentals, 20% on company-specific patterns, 10% on HR rehearsal. Reverse that ratio and you collect the default offer.

Editorial commentary by Aditya Sharma · written for PapersAdda · not generated, not aggregated.

MLOps is the fastest-growing specialization at the intersection of ML and software engineering. Companies have moved from proof-of-concept models to production systems that require continuous training, monitoring, and deployment automation. Candidates report that MLOps roles at Databricks, Google, Amazon, and major Indian tech unicorns are among the best-compensated engineering positions in 2026. This guide covers 28 MLOps interview questions with full answers and production-grade code.

PapersAdda's take: MLOps interviews test whether you can operate an ML system, not just train a model. The questions are heavily scenario-based: "Your model's accuracy dropped 8% last week. Walk me through your debugging process." Prepare for those scenarios. According to candidate accounts from public preparation resources, monitoring and drift detection questions appear in over 70% of senior MLOps rounds. Confirm the specific tooling stack expected on the official company careers portal before preparing.

Related articles: Machine Learning Interview Questions 2026 | Deep Learning Interview Questions 2026 | Data Engineering Interview Questions 2026 | AWS Solutions Architect Interview 2026 | DevOps Engineer Interview Questions 2026


Which Companies Ask MLOps Questions?

Company / RoleMLOps Focus
Databricks, SnowflakeML pipeline orchestration, lakehouse integration
Google (GCP ML)Vertex AI pipelines, TFX, model serving
Amazon (SageMaker)Managed ML pipelines, endpoints, monitoring
Netflix, Spotify, UberLarge-scale model serving, feature stores
Indian unicorns (Meesho, Swiggy, PhonePe)Production recommendation and fraud ML

EASY: Core MLOps Concepts (Questions 1-8)

Q1. What is MLOps? How does it differ from DevOps?

DimensionDevOpsMLOps
ArtifactApplication codeCode + Data + Model
TestingUnit, integration, E2E+ Data validation, model evaluation
DeploymentRolling, blue-green+ Canary, shadow, A/B, champion-challenger
MonitoringLatency, error rate+ Data drift, model degradation, bias
Trigger for redeploymentCode change+ Data drift, performance drop
Key challengeReproducibility of buildsReproducibility of experiments

MLOps extends DevOps to handle the unique properties of ML systems: data and model versioning, experiment tracking, and continuous evaluation.


Q2. What is experiment tracking? Implement an MLflow run.

import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
import numpy as np

mlflow.set_tracking_uri("http://localhost:5000")   # or use mlruns/ locally
mlflow.set_experiment("churn-prediction")

with mlflow.start_run(run_name="gb-v1"):
    # Log hyperparameters
    params = {'n_estimators': 500, 'learning_rate': 0.05, 'max_depth': 5}
    mlflow.log_params(params)

    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)

    # Log metrics
    val_auc = roc_auc_score(y_val, model.predict_proba(X_val)[:,1])
    mlflow.log_metric("val_roc_auc", val_auc)
    mlflow.log_metric("train_roc_auc",
                       roc_auc_score(y_train, model.predict_proba(X_train)[:,1]))

    # Log model with signature
    signature = infer_signature(X_train, model.predict(X_train))
    mlflow.sklearn.log_model(model, "model", signature=signature,
                              registered_model_name="churn-gb")

    # Log artifacts (feature importance plot, etc.)
    mlflow.log_artifact("feature_importance.png")
    mlflow.log_dict({"features": list(X_train.columns)}, "features.json")

    print(f"Val AUC: {val_auc:.4f}")
    print(f"Run ID: {mlflow.active_run().info.run_id}")

Q3. What is a feature store? Why is it important?

  1. Training-serving skew: Features computed differently at training vs. serving time leads to subtle bugs
  2. Reusability: Feature engineering work is shared across teams and models
from feast import FeatureStore
from datetime import datetime
import pandas as pd

# Initialize store from feature_store.yaml
store = FeatureStore(repo_path="feature_repo/")

# Retrieve features for training (point-in-time correct joins)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": [
        datetime(2026, 1, 15), datetime(2026, 1, 20), datetime(2026, 1, 25)
    ]
})

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_spend_90d",
        "user_stats:login_count_7d",
        "product_features:avg_price_category"
    ]
).to_df()

# Serve features at inference time (low-latency)
feature_vector = store.get_online_features(
    features=["user_stats:total_spend_90d", "user_stats:login_count_7d"],
    entity_rows=[{"user_id": 1001}]
).to_dict()

Q4. What is model versioning? How do you manage the model registry?

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register a model
run_id = "abc123def456"
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "churn-predictor")

# Transition model stages
client.transition_model_version_stage(
    name="churn-predictor",
    version=3,
    stage="Staging"
)

# Load model by stage
model = mlflow.sklearn.load_model("models:/churn-predictor/Staging")

# Compare models
latest_versions = client.get_latest_versions("churn-predictor", stages=["Production", "Staging"])
for v in latest_versions:
    print(f"Version {v.version} ({v.current_stage}): {v.run_id}")

# Promote after validation
client.transition_model_version_stage(
    name="churn-predictor",
    version=3,
    stage="Production",
    archive_existing_versions=True  # archive previous Production version
)

Q5. What is a CI/CD pipeline for ML? What stages does it include?

CI/CD for ML (unlike software CI/CD, must handle data + model + code):

Stage 1: Code validation
  - Lint (flake8, black)
  - Unit tests (pytest) for feature transforms, preprocessing
  - Type checking (mypy)

Stage 2: Data validation (NEW vs software)
  - Schema check (Great Expectations)
  - Distribution drift vs reference stats
  - Label quality checks

Stage 3: Model training (on small data slice for CI speed)
  - Run training pipeline
  - Check train metric above baseline threshold

Stage 4: Model evaluation
  - Compare new model vs champion (current Production)
  - Slice-based evaluation (performance by subgroup)
  - Bias/fairness checks (Aequitas, Fairlearn)

Stage 5: Model packaging and staging deployment
  - Package as Docker image
  - Deploy to Staging (shadow mode or canary 1%)
  - Load test serving endpoint

Stage 6: Production deployment
  - Gradual traffic rollout (canary)
  - Monitor for 30 minutes before full promotion
  - Rollback trigger if metrics degrade

Q6. What are the deployment strategies for ML models?

StrategyHowRiskUse When
Blue-greenMaintain two full environments; switch trafficLow (instant rollback)Stateless models, cost-insensitive
CanaryRoute X% traffic to new model; increase graduallyLowDefault for production ML
ShadowNew model runs in parallel, receives same requests, results discardedNoneTesting high-stakes models
A/B testRoute user cohorts to different models; measure outcomeMedium (users see different quality)Evaluating business impact
Champion-challengerProduction model (champion) vs new model (challenger) in A/BMediumContinuous model improvement
import random

class ModelRouter:
    def __init__(self, champion, challenger, challenger_fraction=0.1):
        self.champion   = champion
        self.challenger = challenger
        self.challenger_fraction = challenger_fraction

    def predict(self, features):
        if random.random() < self.challenger_fraction:
            result = self.challenger.predict(features)
            model_version = "challenger"
        else:
            result = self.champion.predict(features)
            model_version = "champion"
        # Log to metrics store for comparison
        self._log(model_version, result)
        return result

    def _log(self, version, result):
        pass  # write to analytics DB for A/B comparison

Q7. What is data drift and how do you detect it?

TypeWhat ShiftsDetection
Covariate shiftP(X) changes; P(YX) stays same
Label driftP(Y) changesMonitor prediction distribution
Concept driftP(YX) changes
Upstream data driftSchema or values change upstreamSchema + value range monitoring
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from scipy.stats import ks_2samp, chi2_contingency
import pandas as pd
import numpy as np

# Evidently for automated drift reports
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=X_train_sample, current_data=X_production_sample)
report.save_html("drift_report.html")

# Manual KS test for continuous features
def check_continuous_drift(train_series, prod_series, alpha=0.05):
    stat, pval = ks_2samp(train_series, prod_series)
    return {'drift': pval < alpha, 'ks_stat': stat, 'p_value': pval}

# PSI (Population Stability Index) for model scores
def compute_psi(expected, actual, n_bins=10):
    eps = 1e-10
    bins = np.percentile(expected, np.linspace(0, 100, n_bins+1))
    bins[0], bins[-1] = -np.inf, np.inf
    exp_pct = np.histogram(expected, bins=bins)[0] / len(expected) + eps
    act_pct = np.histogram(actual,   bins=bins)[0] / len(actual)   + eps
    psi = np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct))
    # PSI < 0.1: no drift; 0.1-0.2: moderate; > 0.2: major drift
    return psi

Q8. How do you containerize an ML model for deployment?

# Dockerfile for ML model serving
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies first (Docker layer caching)
RUN apt-get update && apt-get install -y libgomp1 && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY app/ app/
COPY models/ models/

# Create non-root user
RUN useradd -m -u 1000 mluser && chown -R mluser:mluser /app
USER mluser

EXPOSE 8080
HEALTHCHECK --interval=30s --timeout=10s CMD curl -f http://localhost:8080/health || exit 1

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]
# FastAPI serving application
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.sklearn
import numpy as np

app = FastAPI()
model = mlflow.sklearn.load_model("models:/churn-predictor/Production")

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    probability: float
    prediction: int
    model_version: str = "production"

@app.get("/health")
def health():
    return {"status": "healthy"}

@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
    try:
        x = np.array(request.features).reshape(1, -1)
        prob = float(model.predict_proba(x)[0, 1])
        return PredictionResponse(probability=prob, prediction=int(prob >= 0.5))
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

MEDIUM: Production ML Systems (Questions 9-20)

Q9. How do you implement model monitoring in production?

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Prometheus metrics
PREDICTION_COUNT     = Counter('ml_predictions_total', 'Total predictions', ['model', 'version'])
PREDICTION_LATENCY   = Histogram('ml_prediction_latency_seconds', 'Prediction latency')
PREDICTION_SCORE_AVG = Gauge('ml_prediction_score_avg', 'Rolling average prediction score')
DATA_DRIFT_PSI       = Gauge('ml_data_drift_psi', 'PSI for feature drift', ['feature'])

class MonitoredModel:
    def __init__(self, model, model_name, version):
        self.model = model
        self.model_name = model_name
        self.version = version
        self.score_buffer = []

    def predict(self, x):
        start = time.time()
        score = self.model.predict_proba(x)[:, 1]
        elapsed = time.time() - start

        # Record metrics
        PREDICTION_COUNT.labels(self.model_name, self.version).inc(len(x))
        PREDICTION_LATENCY.observe(elapsed)

        # Rolling average score (track prediction distribution)
        self.score_buffer.extend(score.tolist())
        if len(self.score_buffer) >= 1000:
            PREDICTION_SCORE_AVG.set(sum(self.score_buffer[-1000:]) / 1000)

        return score

# Alerting thresholds
ALERT_THRESHOLDS = {
    'accuracy_drop_pct': 5,      # alert if accuracy drops > 5%
    'drift_psi': 0.2,             # alert if PSI > 0.2 (major drift)
    'p99_latency_ms': 100,        # alert if P99 latency > 100ms
    'error_rate_pct': 0.1         # alert if >0.1% prediction errors
}

Q10. What is the training-serving skew and how do you prevent it?

Root causes:

  • Transform code copy-pasted (diverges over time)
  • Different libraries or versions
  • Different data freshness
  • Missing values handled differently

Prevention:

# The gold standard: use the SAME code path for training and serving
# Strategy 1: Feature store (same features for both)
# Strategy 2: Serialize preprocessing inside the model artifact

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
import joblib

# Bundle preprocessing and model together
preprocessor = ColumnTransformer([
    ('num', StandardScaler(), numerical_cols),
    ('cat', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1), cat_cols)
])
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('model', GradientBoostingClassifier(n_estimators=300))
])

full_pipeline.fit(X_train_raw, y_train)  # raw input (no preprocessing outside pipeline)
joblib.dump(full_pipeline, 'pipeline.pkl')

# At serving time: pass raw features
pipeline = joblib.load('pipeline.pkl')
predictions = pipeline.predict(raw_features)  # same preprocessing as training

# Audit: periodically compare training pipeline output with serving pipeline output
# using the same raw inputs

Q11. How do you design a model retraining pipeline?

Retraining pipeline design:

Trigger conditions (any of these):
  1. Scheduled (weekly or monthly)
  2. Drift detected (PSI > 0.2 or KS p-value < 0.01)
  3. Performance drop (AUC drops > 5% vs baseline)
  4. New labeled data available (batch labeling completed)

Pipeline steps:
  1. Data ingestion (feature store pull, label join)
  2. Data validation (Great Expectations schema + distribution)
  3. Feature computation (run transform pipeline)
  4. Model training (hyperparameter tuning if major drift, else fixed params)
  5. Evaluation (compare to current production model on holdout)
  6. Model registration (MLflow, version bump)
  7. Staging deployment (shadow serving)
  8. Promotion (if staging metrics pass thresholds)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Using Apache Beam for data ingestion + transformation
def run_training_pipeline(data_path, model_output_path, run_date):
    options = PipelineOptions([
        '--runner=DataflowRunner',
        '--project=my-project',
        '--region=us-central1'
    ])

    with beam.Pipeline(options=options) as p:
        data = (
            p
            | 'ReadData' >> beam.io.ReadFromParquet(data_path)
            | 'FilterDate' >> beam.Filter(lambda row: row['date'] >= run_date)
            | 'ComputeFeatures' >> beam.Map(compute_features)
            | 'FilterValid' >> beam.Filter(validate_row)
        )
        # ... rest of pipeline

# Kubeflow pipeline (production standard in 2026)
import kfp
from kfp import dsl

@dsl.component(base_image='python:3.11', packages_to_install=['scikit-learn', 'mlflow'])
def train_model(data_path: str, model_output: dsl.Output[dsl.Model]):
    import mlflow.sklearn
    # ... training code
    pass

@dsl.pipeline(name='churn-retraining')
def retraining_pipeline(data_path: str = 'gs://bucket/data/'):
    train_task = train_model(data_path=data_path)
    evaluate_task = evaluate_model(model=train_task.outputs['model_output'])
    deploy_task = deploy_if_better(evaluation=evaluate_task.outputs['metrics'])

Q12. What is BentoML? How do you use it for model serving?

import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON

# Save model to BentoML model store
bentoml.sklearn.save_model("churn_model", trained_model,
                             signatures={"predict": {"batchable": True}})

# Define service
runner = bentoml.sklearn.get("churn_model:latest").to_runner()
svc = bentoml.Service("churn_service", runners=[runner])

@svc.api(input=NumpyNdarray(dtype="float32", shape=(-1, 20)),
          output=JSON())
async def predict(input_data: np.ndarray) -> dict:
    probs = await runner.predict.async_run(input_data)
    return {
        "probabilities": probs[:, 1].tolist(),
        "predictions": (probs[:, 1] >= 0.5).astype(int).tolist()
    }

# Build and containerize
# bentoml build -> bentoml containerize churn_service:latest

Q13. How do you run load testing for an ML serving endpoint?

# Using Locust for ML endpoint load testing
from locust import HttpUser, task, between
import numpy as np
import json

class MLModelUser(HttpUser):
    wait_time = between(0.05, 0.2)   # 5-20ms between requests (simulate concurrent users)

    def on_start(self):
        # Pre-generate random feature vectors
        self.feature_batch = np.random.randn(64, 20).astype('float32')

    @task
    def predict_single(self):
        features = np.random.randn(1, 20).astype('float32')
        response = self.client.post(
            "/predict",
            json={"features": features.tolist()},
            headers={"Content-Type": "application/json"}
        )
        assert response.status_code == 200

    @task(weight=3)
    def predict_batch(self):
        response = self.client.post(
            "/predict",
            json={"features": self.feature_batch.tolist()}
        )

# Run: locust -f locustfile.py --host=http://model-api --users=100 --spawn-rate=10

# Targets for a production ML API:
# P50 latency: < 20ms
# P99 latency: < 100ms
# Throughput:  > 1,000 RPS per instance
# Error rate:  < 0.1%

Q14. What is an ML metadata store? What does it track?

EntityWhat Is Tracked
DataPath, schema, statistics, splits, version
FeatureComputation code, source data, statistics
ModelArchitecture, hyperparameters, training data version, metrics
ExperimentParameters, metrics, code version (git SHA)
DeploymentModel version, deployment time, endpoint, traffic
PredictionInput features, output scores, timestamp, model version
from ml_metadata import metadata_store
from ml_metadata.proto import metadata_store_pb2 as mlmd_pb2

# Create metadata store
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = '/tmp/mlmd.db'
connection_config.sqlite.connection_mode = 3
store = metadata_store.MetadataStore(connection_config)

# Register artifact types
dataset_type = mlmd_pb2.ArtifactType()
dataset_type.name = "Dataset"
dataset_type_id = store.put_artifact_type(dataset_type)

# Record a training dataset artifact
dataset = mlmd_pb2.Artifact()
dataset.uri = "gs://bucket/train_data/2026-06-08/"
dataset.type_id = dataset_type_id
dataset_id = store.put_artifacts([dataset])[0]

Q15. How do you implement shadow deployment for safe model rollout?

import asyncio
import logging
from dataclasses import dataclass
from typing import Any

@dataclass
class PredictionResult:
    score: float
    latency_ms: float
    model_version: str

class ShadowDeploymentProxy:
    """Routes requests to champion; concurrently fires shadow requests."""

    def __init__(self, champion_model, shadow_model, shadow_log_path):
        self.champion  = champion_model
        self.shadow    = shadow_model
        self.log_path  = shadow_log_path

    async def predict(self, features: Any) -> PredictionResult:
        import time

        # Champion prediction (synchronous, returned to caller)
        t0 = time.perf_counter()
        champion_score = self.champion.predict_proba([features])[0, 1]
        champion_latency = (time.perf_counter() - t0) * 1000

        # Shadow prediction (fire-and-forget, does not block caller)
        asyncio.create_task(self._shadow_predict(features, champion_score))

        return PredictionResult(
            score=float(champion_score),
            latency_ms=champion_latency,
            model_version="champion"
        )

    async def _shadow_predict(self, features, champion_score):
        import time
        try:
            t0 = time.perf_counter()
            shadow_score = self.shadow.predict_proba([features])[0, 1]
            shadow_latency = (time.perf_counter() - t0) * 1000
            self._log_shadow(features, champion_score, shadow_score, shadow_latency)
        except Exception as e:
            logging.error(f"Shadow prediction failed: {e}")

    def _log_shadow(self, features, champion, shadow, latency):
        # Log to analytics for offline comparison
        with open(self.log_path, 'a') as f:
            import json
            f.write(json.dumps({'champion': champion, 'shadow': shadow,
                                  'latency': latency}) + '\n')

Q16. What is Triton Inference Server? When do you use it?

# Client call to Triton
import tritonclient.http as triton_http
import numpy as np

client = triton_http.InferenceServerClient(url="localhost:8000")

# Check model status
print(client.is_model_ready("my_model"))

# Prepare inputs
inputs = [triton_http.InferInput("input__0", [1, 224, 224, 3], "FP32")]
inputs[0].set_data_from_numpy(image_array.astype(np.float32))

# Run inference
outputs = [triton_http.InferRequestedOutput("output__0")]
response = client.infer("my_model", inputs, outputs=outputs)
predictions = response.as_numpy("output__0")

# Dynamic batching config (model config YAML):
# dynamic_batching {
#   preferred_batch_size: [8, 16, 32]
#   max_queue_delay_microseconds: 5000
# }

# When to use Triton:
# - Multi-model serving on single GPU
# - Mixed-framework serving (some PyTorch, some ONNX, some TensorRT)
# - High-throughput serving where batching efficiency matters
# - Ensemble models (chain multiple models)

Q17. How do you implement online feature computation for real-time serving?

import redis
import json
from datetime import datetime

class OnlineFeatureStore:
    """Redis-backed online feature store for sub-millisecond feature serving."""

    def __init__(self, redis_host='localhost', redis_port=6379):
        self.r = redis.Redis(host=redis_host, port=redis_port,
                              decode_responses=True, socket_connect_timeout=2)

    def get_user_features(self, user_id: str) -> dict:
        key = f"user_features:{user_id}"
        raw = self.r.get(key)
        if raw is None:
            return self._compute_default_features(user_id)
        return json.loads(raw)

    def set_user_features(self, user_id: str, features: dict, ttl_seconds: int = 3600):
        key = f"user_features:{user_id}"
        self.r.setex(key, ttl_seconds, json.dumps(features))

    def batch_get_features(self, user_ids: list) -> dict:
        pipe = self.r.pipeline()
        for uid in user_ids:
            pipe.get(f"user_features:{uid}")
        results = pipe.execute()
        return {
            uid: json.loads(r) if r else self._compute_default_features(uid)
            for uid, r in zip(user_ids, results)
        }

    def _compute_default_features(self, user_id: str) -> dict:
        return {"total_spend_90d": 0.0, "login_count_7d": 0, "days_since_signup": 0}

# Feature pipeline: Kafka consumer writes features to Redis
from confluent_kafka import Consumer
import json

def feature_update_consumer():
    consumer = Consumer({
        'bootstrap.servers': 'kafka:9092',
        'group.id': 'feature-updater',
        'auto.offset.reset': 'latest'
    })
    consumer.subscribe(['user-events'])
    store = OnlineFeatureStore()

    while True:
        msg = consumer.poll(1.0)
        if msg and not msg.error():
            event = json.loads(msg.value())
            features = compute_incremental_features(event)
            store.set_user_features(event['user_id'], features)

Q18. What is Great Expectations and how do you use it for data validation?

import great_expectations as gx
import pandas as pd

context = gx.get_context()

# Create expectation suite
suite = context.add_expectation_suite("training_data_suite")

# Define expectations
validator = context.get_validator(
    datasource_name="my_postgres",
    data_connector_name="default_inferred_data_connector",
    data_asset_name="churn_features"
)

# Column existence
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("total_spend_90d")

# Data quality
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("total_spend_90d", min_value=0)
validator.expect_column_values_to_be_in_set("country", ["IN", "US", "GB", "DE"])

# Distribution (statistical)
validator.expect_column_mean_to_be_between("total_spend_90d", min_value=100, max_value=5000)
validator.expect_column_stdev_to_be_between("total_spend_90d", min_value=50)

# Run validation
results = validator.validate()
if not results["success"]:
    failed_expectations = [r for r in results["results"] if not r["success"]]
    for r in failed_expectations:
        print(f"FAILED: {r['expectation_config']['expectation_type']}")
    raise ValueError(f"Data validation failed: {len(failed_expectations)} expectations failed")

Q19. What is ONNX and how does it enable cross-framework deployment?

import torch
import onnx
import onnxruntime as ort
import numpy as np

# Export PyTorch model to ONNX
model.eval()
dummy_input = torch.randn(1, 3, 224, 224)

torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
    opset_version=17,
    do_constant_folding=True   # folds constant sub-expressions at export time
)

# Validate ONNX model
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)

# Run with ONNX Runtime (CPU or CUDA)
ort_session = ort.InferenceSession("model.onnx",
                                     providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name
outputs = ort_session.run(None, {input_name: dummy_input.numpy()})
print(f"ONNX output shape: {outputs[0].shape}")

# ONNX Runtime speedup vs PyTorch eager: often 1.5-2x for inference
# TensorRT (NVIDIA): additional 2-4x via hardware-specific optimization

Q20. How do you handle model rollback in production?

# Model version management with instant rollback
import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

def rollback_model(model_name: str, target_version: int):
    """Roll back to a specific model version."""
    # Get current production version
    current_prod = client.get_latest_versions(model_name, stages=["Production"])
    current_version = current_prod[0].version if current_prod else None

    print(f"Rolling back {model_name} from v{current_version} to v{target_version}")

    # Archive current production
    if current_version:
        client.transition_model_version_stage(
            name=model_name,
            version=current_version,
            stage="Archived"
        )

    # Promote target version to production
    client.transition_model_version_stage(
        name=model_name,
        version=target_version,
        stage="Production"
    )

    print(f"Rollback complete: v{target_version} is now Production")

# Automatic rollback based on metrics
class AutoRollback:
    def __init__(self, model_name, alert_threshold_drop=0.05):
        self.model_name = model_name
        self.threshold  = alert_threshold_drop
        self.baseline_auc = None

    def check_and_rollback(self, current_auc: float, previous_version: int):
        if self.baseline_auc is None:
            self.baseline_auc = current_auc
            return

        drop = (self.baseline_auc - current_auc) / self.baseline_auc
        if drop > self.threshold:
            print(f"AUC dropped {drop:.1%}. Triggering rollback.")
            rollback_model(self.model_name, previous_version)

HARD: Advanced MLOps (Questions 21-28)

Q21. How do you implement real-time feature computation with sub-10ms latency?

Architecture for sub-10ms feature serving:

Layer 1: Redis hot cache (< 0.5ms)
  - Precomputed features for active users
  - TTL: 1-24 hours depending on feature freshness requirement
  - Typical hit rate: 80-95% for recommendation/personalization

Layer 2: In-memory feature compute (1-5ms)
  - Lightweight compute on request payload
  - Example: "time since last login" from request metadata
  - No network call needed

Layer 3: Feature store API (5-15ms)
  - Feast or Tecton serving endpoint
  - Fall through if Redis cache miss

Layer 4: Default/fallback features (< 0.1ms)
  - Use population median values
  - For new users with no history
import asyncio
import redis.asyncio as aioredis

class AsyncFeatureStore:
    def __init__(self):
        self.redis = None

    async def setup(self):
        self.redis = await aioredis.from_url("redis://localhost:6379")

    async def get_features_parallel(self, user_id: str, item_id: str) -> dict:
        # Fetch user and item features concurrently
        user_key  = f"user:{user_id}"
        item_key  = f"item:{item_id}"
        user_raw, item_raw = await asyncio.gather(
            self.redis.get(user_key),
            self.redis.get(item_key)
        )
        import json
        user_feats = json.loads(user_raw) if user_raw else self._default_user()
        item_feats = json.loads(item_raw) if item_raw else self._default_item()
        return {**user_feats, **item_feats}

    def _default_user(self): return {"age": 30, "spend_90d": 0, "logins_7d": 1}
    def _default_item(self): return {"avg_price": 500, "category_id": 0}

Q22. What is continuous training (CT) and how is it different from continuous delivery (CD)?

PipelineTriggerArtifactGoal
Continuous Integration (CI)Code changeTested codeCatch bugs early
Continuous Delivery (CD)Merge to mainDeployable buildFast, reliable deploys
Continuous Training (CT)New data or driftTrained modelKeep model fresh
Continuous Evaluation (CE)New labelsPerformance metricsKnow if model is working
# Continuous training pipeline with Prefect
from prefect import flow, task
from prefect.schedules import CronSchedule

@task(retries=3, retry_delay_seconds=60)
def fetch_new_training_data(since_date: str) -> str:
    # Pull labeled data from warehouse
    return "gs://bucket/training_data/"

@task
def validate_data(data_path: str) -> bool:
    # Run Great Expectations suite
    return True

@task
def train_model(data_path: str) -> str:
    # Train and return MLflow run ID
    return "run_abc123"

@task
def evaluate_model(run_id: str) -> float:
    # Return validation AUC
    return 0.87

@task
def deploy_if_better(run_id: str, new_auc: float, min_improvement: float = 0.001):
    import mlflow
    client = mlflow.MlflowClient()
    # Compare to production model
    # ... deploy logic ...
    pass

@flow(schedule=CronSchedule(cron="0 2 * * 1"))  # every Monday 2 AM
def weekly_retraining_flow():
    data_path = fetch_new_training_data(since_date="last_week")
    if validate_data(data_path):
        run_id = train_model(data_path)
        auc = evaluate_model(run_id)
        deploy_if_better(run_id, auc)

Q23. How do you implement slice-based evaluation for fairness?

import pandas as pd
import numpy as np
from sklearn.metrics import roc_auc_score, f1_score

def slice_evaluation(model, X_test, y_test, slice_columns: list, min_slice_size=50):
    """
    Evaluate model performance on data slices (demographic groups, regions, etc.)
    Surfaces groups where model underperforms.
    """
    results = []
    y_scores = model.predict_proba(X_test)[:, 1]
    overall_auc = roc_auc_score(y_test, y_scores)

    for col in slice_columns:
        for value in X_test[col].unique():
            mask = X_test[col] == value
            n = mask.sum()

            if n < min_slice_size:
                continue

            slice_auc = roc_auc_score(y_test[mask], y_scores[mask])
            disparity = slice_auc - overall_auc  # negative = underperformance

            results.append({
                'slice_col': col,
                'slice_value': value,
                'n': n,
                'auc': round(slice_auc, 4),
                'disparity_vs_overall': round(disparity, 4),
                'flag': disparity < -0.05  # flag if > 5% drop vs overall
            })

    df = pd.DataFrame(results).sort_values('auc')
    flagged = df[df['flag']]

    if len(flagged) > 0:
        print(f"WARNING: {len(flagged)} underperforming slices detected:")
        print(flagged.to_string())

    return df

# Typical slices to evaluate:
# - Gender, age_group, state/region (for credit/churn)
# - Device type, operating system (for recommendation)
# - Business size, industry (for B2B models)

Q24. What is the Google ML Test Score and what does a mature ML system look like?

AreaTestsWhat Is Checked
Features and dataData schema, range, distributionNo silent data failures
Model developmentOffline evaluation, training performanceReproducibility, benchmarks
ML infrastructureTraining pipeline, serving pipelineCan retrain and redeploy reliably
MonitoringAlerting, dashboards, stalenessKnow when model is failing

Mature ML system checklist:

Data:
  [x] Feature distributions monitored continuously
  [x] Schema validation on every data ingestion
  [x] Training-serving skew check (automated comparison)
  [x] Data lineage tracked (what data trained which model)

Model:
  [x] Offline metrics regression test (new model must beat baseline)
  [x] Slice-based evaluation (no hidden underperformance on subgroups)
  [x] Reproducible training (same code + data = same model)
  [x] Unit tests for preprocessing transforms

Infrastructure:
  [x] Model can be retrained in < 4 hours (data to deployed endpoint)
  [x] Rollback to any previous version in < 5 minutes
  [x] Canary deployment with automatic abort on metric degradation
  [x] Load test for 2x expected peak traffic

Monitoring:
  [x] Prediction latency P50/P99 alerts
  [x] Feature drift alerts (PSI, KS test)
  [x] Model performance alerts (accuracy, AUC)
  [x] Upstream data freshness alerts

Q25. How do you implement model explainability at scale?

import shap
import numpy as np
import json
from pathlib import Path

class ScalableExplainer:
    """SHAP-based explainer with caching for high-throughput serving."""

    def __init__(self, model, background_data, n_background=500):
        self.explainer = shap.TreeExplainer(model,
                                             data=background_data[:n_background])
        self.cache = {}

    def explain(self, x: np.ndarray, request_id: str = None, cache=True) -> dict:
        # SHAP values for a single prediction
        shap_values = self.explainer.shap_values(x)

        if isinstance(shap_values, list):
            # Multi-class: use class 1 for binary classification
            sv = shap_values[1]
        else:
            sv = shap_values

        explanation = {
            'base_value': float(self.explainer.expected_value
                                 if np.isscalar(self.explainer.expected_value)
                                 else self.explainer.expected_value[1]),
            'feature_contributions': {
                f'feature_{i}': float(sv[0, i]) for i in range(sv.shape[1])
            },
            'top_3_drivers': self._top_drivers(sv[0])
        }
        return explanation

    def _top_drivers(self, shap_row: np.ndarray) -> list:
        idx = np.argsort(np.abs(shap_row))[::-1][:3]
        return [{'feature_idx': int(i), 'contribution': float(shap_row[i])} for i in idx]

Q26. What is the difference between batch scoring and real-time scoring architectures?

PropertyBatch ScoringReal-time Scoring
TriggerScheduled (hourly/daily)User request (on-demand)
LatencyMinutes to hours< 100ms
ThroughputMillions of records100-10,000 RPS per server
FreshnessStale by schedule intervalFresh per request
InfrastructureSpark, Databricks, BigQueryFastAPI, Triton, BentoML
Feature freshnessPrecomputed batch features OKOnline features required
CostLow (scheduled compute)Higher (always-on servers)
# Batch scoring with Spark
from pyspark.sql import SparkSession
import mlflow.spark

spark = SparkSession.builder.appName("BatchScoring").getOrCreate()

# Load trained model from MLflow
model_uri = "models:/churn-predictor/Production"
model = mlflow.spark.load_model(model_uri)

# Score 10M users in batch
users_df = spark.read.parquet("s3://data/users/features/2026-06-08/")
predictions = model.transform(users_df)
predictions.select("user_id", "prediction", "probability").write.parquet(
    "s3://data/predictions/churn/2026-06-08/"
)

# Real-time: see serving examples throughout this guide

Q27. How do you implement A/B testing for ML models with statistical rigor?

import numpy as np
from scipy.stats import ttest_ind, chi2_contingency
from scipy.stats import mannwhitneyu
import pandas as pd

class MLABTest:
    def __init__(self, control_data: pd.DataFrame, treatment_data: pd.DataFrame):
        self.control   = control_data
        self.treatment = treatment_data

    def test_conversion_rate(self, alpha=0.05) -> dict:
        """Two-proportion z-test for binary outcomes."""
        from statsmodels.stats.proportion import proportions_ztest
        count = np.array([
            self.treatment['converted'].sum(),
            self.control['converted'].sum()
        ])
        nobs = np.array([len(self.treatment), len(self.control)])
        stat, pval = proportions_ztest(count, nobs)

        control_rate   = self.control['converted'].mean()
        treatment_rate = self.treatment['converted'].mean()
        lift = (treatment_rate - control_rate) / control_rate

        return {
            'control_rate':   round(control_rate, 4),
            'treatment_rate': round(treatment_rate, 4),
            'lift_pct':       round(lift * 100, 2),
            'p_value':        round(pval, 4),
            'significant':    pval < alpha,
            'sample_sizes':   {'control': len(self.control), 'treatment': len(self.treatment)}
        }

    def sequential_test(self, alpha=0.05, max_n=50000) -> dict:
        """Sequential testing (avoids peeking problem with fixed-horizon tests)."""
        # SPRT (Sequential Probability Ratio Test) approach
        # Allows stopping early when sufficient evidence accumulates
        # Use when you cannot afford to wait for full sample
        pass

# Power analysis (before running the test)
from statsmodels.stats.power import NormalIndPower

effect_size = 0.05   # detect 5% relative lift in conversion
alpha = 0.05
power = 0.80

analysis = NormalIndPower()
required_n = analysis.solve_power(effect_size=effect_size,
                                   alpha=alpha, power=power, alternative='larger')
print(f"Required sample size per arm: {int(required_n)}")

Q28. Design a complete ML platform for an e-commerce company.

ML Platform Architecture for E-commerce (2026):

Data Layer:
  - Event streaming: Kafka (clickstream, purchases, search queries)
  - Data lake: S3/GCS with Delta Lake format (version-controlled tables)
  - Data warehouse: Snowflake or BigQuery (analytics, feature computation)
  - Feature store: Feast (offline: Snowflake, online: Redis)

Training Layer:
  - Experiment tracking: MLflow (hosted on MLflow tracking server)
  - Training orchestration: Kubeflow Pipelines on Kubernetes
  - Compute: GPU clusters for deep learning; Spark on Databricks for tabular
  - HPO: Optuna (Bayesian) with distributed workers

Serving Layer:
  - REST API: FastAPI with uvicorn, containerized on Kubernetes
  - Batch scoring: Spark on Databricks, results to S3
  - Model server: Triton Inference Server for GPU-accelerated serving
  - CDN/caching: Varnish or CloudFront for latency reduction

MLOps Layer:
  - CI/CD: GitHub Actions -> Docker build -> Kubernetes deploy
  - Model registry: MLflow Model Registry (champion-challenger management)
  - Monitoring: Prometheus + Grafana (latency, throughput, drift)
  - Alerting: PagerDuty integration for P0 model degradation
  - Rollback: Automated rollback triggered by monitoring alerts

Key ML models:
  - Recommendation: Two-tower model, updated hourly
  - Search ranking: LightGBM with semantic features, updated daily
  - Fraud detection: LightGBM + GNN, real-time scoring
  - Churn prediction: XGBoost, weekly batch scoring
  - Pricing optimization: Contextual bandit, real-time

MLOps Tools Reference 2026

CategoryToolsNotes
Experiment TrackingMLflow, Weights and BiasesMLflow for self-hosted; W&B for teams
Pipeline OrchestrationKubeflow, Apache Airflow, PrefectKubeflow for K8s-native; Prefect for simplicity
Feature StoreFeast, Tecton, HopsworksFeast open-source; Tecton enterprise
Model ServingTriton, BentoML, TorchServe, Ray ServeTriton for GPU; BentoML for simplicity
Model MonitoringEvidently, Arize, WhyLabsEvidently for open-source
Data ValidationGreat Expectations, PanderaGreat Expectations more mature
Container OrchestrationKubernetesRequired for all production ML

FAQ

Q: What is the difference between DevOps and MLOps engineers? A: A DevOps engineer focuses on application code deployment, CI/CD, and infrastructure. An MLOps engineer additionally manages data pipelines, model training pipelines, model versioning, and production model monitoring. The roles overlap significantly; many MLOps engineers come from DevOps backgrounds.

Q: Is Kubernetes required for MLOps? A: For production at scale, yes. Most MLOps platforms (Kubeflow, Seldon, KServe) are Kubernetes-native. Understand pods, deployments, services, ConfigMaps, horizontal pod autoscaling, and resource limits.

Q: What is the most common MLOps failure mode? A: Training-serving skew is the most insidious. Models perform well in offline evaluation but degrade in production because features are computed differently. The fix is to use a feature store or serialize preprocessing inside the model artifact.


Related articles on PapersAdda:

Methodology applied to this articlelast verified 8 Jun 2026
Sources used
Public exam-pattern documents, official recruiter pages, and verified candidate reports on r/developersIndia and LinkedIn.
Verification window
Page last edited 8 Jun 2026 by Aditya Sharma. Numbers and patterns sanity-checked against the most recent 2026 cycle drives we tracked.
What we did NOT do
  • No fabricated salary numbers or success rates. If we quote a range, it's sourced.
  • No noun-substituted templates. This article was not generated by swapping company names in a stock prompt.
  • No paid placements, sponsored coaching links, or affiliate-shilled course pushes.
Verification policy: /editorial-standards/. Found something incorrect? Submit a correction - we respond within 48 hours.

Explore this topic cluster

More resources in Interview Questions

Use the category hub to browse similar questions, exam patterns, salary guides, and preparation resources related to this topic.

Paid contributor programme

Sat this this year? Share your story, earn ₹500.

First-person experience reports help future candidates prep smarter. We pay verified contributors ₹500 via UPI per accepted story - with byline.

Submit your story →

Ready to practice?

Take a free timed mock test

Put what you learned into practice. Our mock tests match the 2026 pattern with timer, navigator, reveal, and score breakdown. No signup.

Start Free Mock Test →

Related Articles

More from PapersAdda

Share this guide: