REST, GraphQL, and gRPC - When to Use Each
| Code | Meaning | Use Case |
|---|---|---|
| 200 OK | Success | GET, PUT, PATCH successful |
| 201 Created | Resource created | POST successful, return Location header |
| 204 No Content | Success, no body | DELETE successful |
| 400 Bad Request | Invalid input | Validation errors |
| 401 Unauthorized | Authentication required | Missing or invalid auth token |
| 403 Forbidden | No permission | Authenticated but not authorized |
| 404 Not Found | Resource doesn't exist | Invalid ID or deleted resource |
| 409 Conflict | Resource conflict | Duplicate email, version mismatch |
| 429 Too Many Requests | Rate limit exceeded | Return Retry-After header |
| 500 Internal Server Error | Server error | Unexpected error |
from flask import Flask, request, jsonify, url_for
from flask_sqlalchemy import SQLAlchemy
from functools import wraps
from datetime import datetime
import jwt
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/api_db'
db = SQLAlchemy(app)
# Models
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class Product(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
stock = db.Column(db.Integer, default=0)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Authentication decorator
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Authorization required'}), 401
try:
# Verify JWT token
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
request.user_id = payload['user_id']
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated
# ============================================
# RESTful Routes
# ============================================
# ✅ Good REST Design
# Resource-based URLs, proper HTTP methods
# List all products (with pagination)
@app.route('/api/v1/products', methods=['GET'])
def get_products():
"""
GET /api/v1/products?page=1&per_page=20&sort=price&order=asc
Returns paginated list of products
"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
sort_by = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
# Validate pagination
if per_page > 100:
return jsonify({'error': 'per_page max is 100'}), 400
# Build query
query = Product.query
if order == 'asc':
query = query.order_by(getattr(Product, sort_by).asc())
else:
query = query.order_by(getattr(Product, sort_by).desc())
# Paginate
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return jsonify({
'products': [
{
'id': p.id,
'name': p.name,
'price': p.price,
'stock': p.stock,
'url': url_for('get_product', product_id=p.id, _external=True)
}
for p in pagination.items
],
'pagination': {
'page': page,
'per_page': per_page,
'total': pagination.total,
'pages': pagination.pages,
'next': url_for('get_products', page=page+1, per_page=per_page, _external=True)
if pagination.has_next else None,
'prev': url_for('get_products', page=page-1, per_page=per_page, _external=True)
if pagination.has_prev else None
}
}), 200
# Get single product
@app.route('/api/v1/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
"""
GET /api/v1/products/123
Returns single product
"""
product = Product.query.get_or_404(product_id)
return jsonify({
'id': product.id,
'name': product.name,
'description': product.description,
'price': product.price,
'stock': product.stock,
'created_at': product.created_at.isoformat(),
'_links': {
'self': url_for('get_product', product_id=product.id, _external=True),
'update': url_for('update_product', product_id=product.id, _external=True),
'delete': url_for('delete_product', product_id=product.id, _external=True)
}
}), 200
# Create product
@app.route('/api/v1/products', methods=['POST'])
@require_auth
def create_product():
"""
POST /api/v1/products
Body: {"name": "...", "price": 99.99, "stock": 10}
Returns: 201 Created with Location header
"""
data = request.get_json()
# Validate input
errors = {}
if not data.get('name'):
errors['name'] = 'Name is required'
if not data.get('price') or data['price'] <= 0:
errors['price'] = 'Valid price is required'
if errors:
return jsonify({'errors': errors}), 400
# Create product
product = Product(
name=data['name'],
description=data.get('description', ''),
price=data['price'],
stock=data.get('stock', 0)
)
db.session.add(product)
db.session.commit()
# Return 201 with Location header
response = jsonify({
'id': product.id,
'name': product.name,
'price': product.price,
'stock': product.stock,
'_links': {
'self': url_for('get_product', product_id=product.id, _external=True)
}
})
response.status_code = 201
response.headers['Location'] = url_for('get_product', product_id=product.id, _external=True)
return response
# Update product (full replacement)
@app.route('/api/v1/products/<int:product_id>', methods=['PUT'])
@require_auth
def update_product(product_id):
"""
PUT /api/v1/products/123
Full replacement of resource
"""
product = Product.query.get_or_404(product_id)
data = request.get_json()
# Validate
if not data.get('name') or not data.get('price'):
return jsonify({'error': 'Name and price required'}), 400
# Update all fields
product.name = data['name']
product.description = data.get('description', '')
product.price = data['price']
product.stock = data.get('stock', 0)
db.session.commit()
return jsonify({
'id': product.id,
'name': product.name,
'price': product.price,
'stock': product.stock
}), 200
# Partial update
@app.route('/api/v1/products/<int:product_id>', methods=['PATCH'])
@require_auth
def partial_update_product(product_id):
"""
PATCH /api/v1/products/123
Partial update (only provided fields)
"""
product = Product.query.get_or_404(product_id)
data = request.get_json()
# Update only provided fields
if 'name' in data:
product.name = data['name']
if 'price' in data:
product.price = data['price']
if 'stock' in data:
product.stock = data['stock']
if 'description' in data:
product.description = data['description']
db.session.commit()
return jsonify({
'id': product.id,
'name': product.name,
'price': product.price,
'stock': product.stock
}), 200
# Delete product
@app.route('/api/v1/products/<int:product_id>', methods=['DELETE'])
@require_auth
def delete_product(product_id):
"""
DELETE /api/v1/products/123
Returns: 204 No Content
"""
product = Product.query.get_or_404(product_id)
db.session.delete(product)
db.session.commit()
return '', 204
# Nested resource example
@app.route('/api/v1/users/<int:user_id>/orders', methods=['GET'])
def get_user_orders(user_id):
"""
GET /api/v1/users/123/orders
Nested resource: orders belonging to a user
"""
user = User.query.get_or_404(user_id)
orders = user.orders # Assuming relationship defined
return jsonify({
'user_id': user.id,
'orders': [
{
'id': order.id,
'total': order.total,
'status': order.status,
'url': url_for('get_order', order_id=order.id, _external=True)
}
for order in orders
]
}), 200
# ============================================
# Error Handling
# ============================================
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
return jsonify({'error': 'Internal server error'}), 500
@app.errorhandler(429)
def rate_limit_exceeded(error):
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': 60
}), 429
# ============================================
# API Versioning
# ============================================
# Version in URL (recommended)
@app.route('/api/v1/products')
@app.route('/api/v2/products')
def versioned_endpoint():
"""
URL versioning is most explicit and cache-friendly
"""
pass
# Version in header (alternative)
@app.before_request
def check_version():
"""
Accept: application/vnd.myapi.v2+json
"""
accept = request.headers.get('Accept', '')
if 'vnd.myapi.v2' in accept:
request.api_version = 2
else:
request.api_version = 1
Why GraphQL?
import graphene
from graphene import relay
from graphene_sqlalchemy import SQLAlchemyObjectType, SQLAlchemyConnectionField
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
Base = declarative_base()
engine = create_engine('postgresql://localhost/graphql_db')
Session = sessionmaker(bind=engine)
# SQLAlchemy Models
class UserModel(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)
email = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
orders = relationship("OrderModel", back_populates="user")
class ProductModel(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String)
description = Column(String)
price = Column(Float)
stock = Column(Integer)
class OrderModel(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
total = Column(Float)
status = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
user = relationship("UserModel", back_populates="orders")
# GraphQL Types
class User(SQLAlchemyObjectType):
class Meta:
model = UserModel
interfaces = (relay.Node,)
class Product(SQLAlchemyObjectType):
class Meta:
model = ProductModel
interfaces = (relay.Node,)
class Order(SQLAlchemyObjectType):
class Meta:
model = OrderModel
interfaces = (relay.Node,)
# Queries
class Query(graphene.ObjectType):
"""
Define all read operations
"""
node = relay.Node.Field()
# Get all users
all_users = SQLAlchemyConnectionField(User.connection)
# Get all products
all_products = SQLAlchemyConnectionField(Product.connection)
# Get single user by ID
user = graphene.Field(User, id=graphene.Int())
# Get single product by ID
product = graphene.Field(Product, id=graphene.Int())
# Search products
search_products = graphene.List(
Product,
query=graphene.String(required=True),
min_price=graphene.Float(),
max_price=graphene.Float()
)
def resolve_user(self, info, id):
"""Resolver for single user"""
session = Session()
return session.query(UserModel).get(id)
def resolve_product(self, info, id):
"""Resolver for single product"""
session = Session()
return session.query(ProductModel).get(id)
def resolve_search_products(self, info, query, min_price=None, max_price=None):
"""
Resolver with filtering
This is where GraphQL shines - flexible querying
"""
session = Session()
# Build query
q = session.query(ProductModel).filter(
ProductModel.name.ilike(f'%{query}%')
)
if min_price is not None:
q = q.filter(ProductModel.price >= min_price)
if max_price is not None:
q = q.filter(ProductModel.price <= max_price)
return q.all()
# Mutations (Create, Update, Delete)
class CreateProduct(graphene.Mutation):
"""
Mutation to create a product
"""
class Arguments:
name = graphene.String(required=True)
description = graphene.String()
price = graphene.Float(required=True)
stock = graphene.Int()
# Return type
product = graphene.Field(lambda: Product)
ok = graphene.Boolean()
def mutate(self, info, name, price, description=None, stock=0):
session = Session()
product = ProductModel(
name=name,
description=description,
price=price,
stock=stock
)
session.add(product)
session.commit()
session.refresh(product)
return CreateProduct(product=product, ok=True)
class UpdateProduct(graphene.Mutation):
"""
Mutation to update a product
"""
class Arguments:
id = graphene.Int(required=True)
name = graphene.String()
description = graphene.String()
price = graphene.Float()
stock = graphene.Int()
product = graphene.Field(lambda: Product)
ok = graphene.Boolean()
def mutate(self, info, id, **kwargs):
session = Session()
product = session.query(ProductModel).get(id)
if not product:
return UpdateProduct(product=None, ok=False)
# Update fields
for key, value in kwargs.items():
setattr(product, key, value)
session.commit()
session.refresh(product)
return UpdateProduct(product=product, ok=True)
class DeleteProduct(graphene.Mutation):
"""
Mutation to delete a product
"""
class Arguments:
id = graphene.Int(required=True)
ok = graphene.Boolean()
def mutate(self, info, id):
session = Session()
product = session.query(ProductModel).get(id)
if not product:
return DeleteProduct(ok=False)
session.delete(product)
session.commit()
return DeleteProduct(ok=True)
class Mutation(graphene.ObjectType):
"""
Define all write operations
"""
create_product = CreateProduct.Field()
update_product = UpdateProduct.Field()
delete_product = DeleteProduct.Field()
# Create schema
schema = graphene.Schema(query=Query, mutation=Mutation)
# ============================================
# Flask integration
# ============================================
from flask import Flask
from flask_graphql import GraphQLView
app = Flask(__name__)
app.add_url_rule(
'/graphql',
view_func=GraphQLView.as_view(
'graphql',
schema=schema,
graphiql=True # Enable GraphiQL IDE
)
)
if __name__ == '__main__':
Base.metadata.create_all(engine)
app.run(debug=True)
// Query 1: Get specific fields only
// ✅ No over-fetching - client decides what to fetch
{
user(id: 1) {
username
email
}
}
// Query 2: Nested queries (solve N+1 with DataLoader)
// ✅ Get user and all their orders in one request
{
user(id: 1) {
username
email
orders {
id
total
status
createdAt
}
}
}
// Query 3: Multiple queries in one request
{
user1: user(id: 1) {
username
}
user2: user(id: 2) {
username
}
topProducts: searchProducts(query: "laptop", maxPrice: 1000) {
name
price
}
}
// Query 4: Fragments for reusability
fragment UserDetails on User {
id
username
email
createdAt
}
{
user(id: 1) {
...UserDetails
orders {
total
}
}
}
// Mutation 1: Create product
mutation {
createProduct(
name: "MacBook Pro"
description: "16-inch M3 Max"
price: 2499.99
stock: 10
) {
product {
id
name
price
}
ok
}
}
// Mutation 2: Update product
mutation {
updateProduct(
id: 1
price: 2299.99
stock: 15
) {
product {
id
name
price
stock
}
ok
}
}
Why gRPC?
# product.proto
"""
syntax = "proto3";
package ecommerce;
// Product message
message Product {
int32 id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
int64 created_at = 6;
}
// Request/Response messages
message GetProductRequest {
int32 id = 1;
}
message GetProductResponse {
Product product = 1;
}
message ListProductsRequest {
int32 page = 1;
int32 page_size = 2;
string sort_by = 3;
}
message ListProductsResponse {
repeated Product products = 1;
int32 total = 2;
int32 page = 3;
int32 total_pages = 4;
}
message CreateProductRequest {
string name = 1;
string description = 2;
double price = 3;
int32 stock = 4;
}
message CreateProductResponse {
Product product = 1;
}
message UpdateProductRequest {
int32 id = 1;
string name = 2;
string description = 3;
double price = 4;
int32 stock = 5;
}
message UpdateProductResponse {
Product product = 1;
}
message DeleteProductRequest {
int32 id = 1;
}
message DeleteProductResponse {
bool success = 1;
}
// Streaming example
message ProductUpdate {
int32 product_id = 1;
double new_price = 2;
int32 new_stock = 3;
int64 timestamp = 4;
}
// Service definition
service ProductService {
// Unary RPC (request-response)
rpc GetProduct(GetProductRequest) returns (GetProductResponse);
rpc ListProducts(ListProductsRequest) returns (ListProductsResponse);
rpc CreateProduct(CreateProductRequest) returns (CreateProductResponse);
rpc UpdateProduct(UpdateProductRequest) returns (UpdateProductResponse);
rpc DeleteProduct(DeleteProductRequest) returns (DeleteProductResponse);
// Server streaming (server sends stream of responses)
rpc WatchProducts(GetProductRequest) returns (stream ProductUpdate);
// Client streaming (client sends stream of requests)
rpc BatchCreateProducts(stream CreateProductRequest) returns (CreateProductResponse);
// Bidirectional streaming
rpc LivePriceUpdates(stream ProductUpdate) returns (stream ProductUpdate);
}
"""
# Generate Python code from .proto:
# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. product.proto
import grpc
from concurrent import futures
import time
from datetime import datetime
# Import generated code (from .proto compilation)
# import product_pb2
# import product_pb2_grpc
# For this example, we'll simulate the generated classes
class ProductServiceServicer:
"""
gRPC server implementation
Implements methods defined in .proto service
"""
def GetProduct(self, request, context):
"""
Unary RPC: Single request, single response
"""
product_id = request.id
# Fetch from database
product = self.db.get_product(product_id)
if not product:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f'Product {product_id} not found')
return product_pb2.GetProductResponse()
# Build response
response = product_pb2.GetProductResponse(
product=product_pb2.Product(
id=product.id,
name=product.name,
description=product.description,
price=product.price,
stock=product.stock,
created_at=int(product.created_at.timestamp())
)
)
return response
def ListProducts(self, request, context):
"""
List products with pagination
"""
page = request.page or 1
page_size = min(request.page_size or 20, 100) # Max 100 per page
# Fetch from database
products, total = self.db.list_products(
page=page,
page_size=page_size,
sort_by=request.sort_by
)
# Build response
response = product_pb2.ListProductsResponse(
products=[
product_pb2.Product(
id=p.id,
name=p.name,
description=p.description,
price=p.price,
stock=p.stock
)
for p in products
],
total=total,
page=page,
total_pages=(total + page_size - 1) // page_size
)
return response
def CreateProduct(self, request, context):
"""
Create new product
"""
# Validate
if not request.name or request.price <= 0:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Invalid product data')
return product_pb2.CreateProductResponse()
# Create in database
product = self.db.create_product(
name=request.name,
description=request.description,
price=request.price,
stock=request.stock
)
# Return created product
response = product_pb2.CreateProductResponse(
product=product_pb2.Product(
id=product.id,
name=product.name,
description=product.description,
price=product.price,
stock=product.stock
)
)
return response
def WatchProducts(self, request, context):
"""
Server streaming: Send updates as they happen
Client receives stream of updates
"""
product_id = request.id
# Subscribe to product updates (e.g., Redis pub/sub)
subscription = self.pubsub.subscribe(f'product:{product_id}')
try:
for message in subscription:
# Send update to client
update = product_pb2.ProductUpdate(
product_id=product_id,
new_price=message['price'],
new_stock=message['stock'],
timestamp=int(time.time())
)
yield update
except GeneratorExit:
# Client disconnected
subscription.unsubscribe()
def BatchCreateProducts(self, request_iterator, context):
"""
Client streaming: Client sends stream of products
Server returns single response at the end
"""
created_count = 0
for create_request in request_iterator:
try:
self.db.create_product(
name=create_request.name,
description=create_request.description,
price=create_request.price,
stock=create_request.stock
)
created_count += 1
except Exception as e:
print(f"Error creating product: {e}")
return product_pb2.BatchCreateResponse(
created_count=created_count
)
def LivePriceUpdates(self, request_iterator, context):
"""
Bidirectional streaming:
- Client sends price updates
- Server broadcasts to all connected clients
"""
for update in request_iterator:
# Validate and apply update
product = self.db.get_product(update.product_id)
if product:
# Update price
self.db.update_price(update.product_id, update.new_price)
# Broadcast to all subscribers
broadcast = product_pb2.ProductUpdate(
product_id=update.product_id,
new_price=update.new_price,
new_stock=product.stock,
timestamp=int(time.time())
)
# Yield to this client
yield broadcast
def serve():
"""
Start gRPC server
"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# Add servicer to server
product_pb2_grpc.add_ProductServiceServicer_to_server(
ProductServiceServicer(),
server
)
# Listen on port
server.add_insecure_port('[::]:50051')
print("gRPC server starting on port 50051...")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
import grpc
# import product_pb2
# import product_pb2_grpc
class ProductClient:
"""
gRPC client for ProductService
"""
def __init__(self, host='localhost', port=50051):
# Create channel
self.channel = grpc.insecure_channel(f'{host}:{port}')
# Create stub (client)
self.stub = product_pb2_grpc.ProductServiceStub(self.channel)
def get_product(self, product_id):
"""
Unary call: Get single product
"""
request = product_pb2.GetProductRequest(id=product_id)
try:
response = self.stub.GetProduct(request)
print(f"Product: {response.product.name} - ${response.product.price}")
return response.product
except grpc.RpcError as e:
print(f"Error: {e.code()} - {e.details()}")
return None
def list_products(self, page=1, page_size=20):
"""
List products with pagination
"""
request = product_pb2.ListProductsRequest(
page=page,
page_size=page_size,
sort_by='price'
)
response = self.stub.ListProducts(request)
print(f"Total products: {response.total}")
for product in response.products:
print(f" - {product.name}: ${product.price}")
return response.products
def create_product(self, name, price, description='', stock=0):
"""
Create new product
"""
request = product_pb2.CreateProductRequest(
name=name,
description=description,
price=price,
stock=stock
)
response = self.stub.CreateProduct(request)
print(f"Created product: {response.product.name} (ID: {response.product.id})")
return response.product
def watch_product_updates(self, product_id):
"""
Server streaming: Watch for product updates
"""
request = product_pb2.GetProductRequest(id=product_id)
# Receive stream of updates
try:
for update in self.stub.WatchProducts(request):
print(f"Update for product {update.product_id}:")
print(f" Price: ${update.new_price}")
print(f" Stock: {update.new_stock}")
except KeyboardInterrupt:
print("Stopped watching")
def batch_create(self, products):
"""
Client streaming: Send multiple products
"""
def generate_requests():
for product_data in products:
yield product_pb2.CreateProductRequest(
name=product_data['name'],
description=product_data.get('description', ''),
price=product_data['price'],
stock=product_data.get('stock', 0)
)
response = self.stub.BatchCreateProducts(generate_requests())
print(f"Created {response.created_count} products")
def close(self):
"""Close channel"""
self.channel.close()
# Usage example
if __name__ == '__main__':
client = ProductClient()
# Unary call
client.get_product(1)
# List products
client.list_products(page=1, page_size=10)
# Create product
client.create_product(
name="MacBook Pro",
price=2499.99,
description="16-inch M3 Max",
stock=10
)
# Watch for updates (blocking)
# client.watch_product_updates(1)
# Batch create
products = [
{'name': 'Product 1', 'price': 99.99},
{'name': 'Product 2', 'price': 149.99},
{'name': 'Product 3', 'price': 199.99},
]
client.batch_create(products)
client.close()
| Feature | REST | GraphQL | gRPC |
|---|---|---|---|
| Protocol | HTTP/1.1 | HTTP/1.1 | HTTP/2 |
| Data Format | JSON, XML | JSON | Protocol Buffers (binary) |
| Schema | OpenAPI (optional) | Strongly typed (required) | Protocol Buffers (required) |
| Endpoints | Multiple (one per resource) | Single (/graphql) | Service methods |
| Caching | HTTP caching (GET requests) | Complex (usually POST) | No built-in caching |
| Performance | Good | Good (can be better than REST) | Excellent (binary, HTTP/2) |
| Browser Support | Native | Native | Requires grpc-web |
| Streaming | SSE, WebSockets | Subscriptions (WebSockets) | Native (bidirectional) |
| Learning Curve | Low | Medium | High |
| Best For | Public APIs, CRUD apps | Complex queries, mobile apps | Microservices, real-time |