0% found this document useful (0 votes)
115 views16 pages

Django Microservices

This document provides a comprehensive guide on implementing microservices using Django, covering fundamental concepts, architecture patterns, and step-by-step implementation strategies. It includes details on service decomposition, inter-service communication, service discovery, circuit breaker patterns, distributed tracing, and data consistency methods. The guide features code examples to illustrate the practical application of these concepts in a Django environment.

Uploaded by

mukesh.khanijo
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
115 views16 pages

Django Microservices

This document provides a comprehensive guide on implementing microservices using Django, covering fundamental concepts, architecture patterns, and step-by-step implementation strategies. It includes details on service decomposition, inter-service communication, service discovery, circuit breaker patterns, distributed tracing, and data consistency methods. The guide features code examples to illustrate the practical application of these concepts in a Django environment.

Uploaded by

mukesh.khanijo
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 16

DJANGO MICROSERVICES - STEP BY STEP GUIDE

MICROSERVICES FUNDAMENTALS

1. WHAT ARE MICROSERVICES?


Q: Explain the concept of microservices and how they differ from monolithic
applications.
A:
- Microservices are small, independent services that communicate over a network
- Each service has its own database and business logic
- Services are loosely coupled and can be developed/deployed independently
- Each service focuses on a specific business capability
- Communication happens via HTTP APIs, message queues, or gRPC

Example - Monolithic vs Microservices:


```python
# Monolithic Structure
myapp/
├── manage.py
├── myapp/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ ├── wsgi.py
│ ├── models.py
│ ├── views.py
│ └── apps/
│ ├── users/
│ ├── products/
│ ├── orders/
│ └── payments/

# Microservices Structure
services/
├── user-service/
├── product-service/
├── order-service/
└── payment-service/
```

2. MICROSERVICE ARCHITECTURE PATTERNS


Q: What are the common microservice architecture patterns and when to use them?
A:
- API Gateway Pattern: Single entry point for all client requests
- Service Discovery: Dynamic service registration and discovery
- Circuit Breaker: Handle service failures gracefully
- Event-Driven Architecture: Loose coupling through events
- Saga Pattern: Distributed transactions across services

Example - API Gateway Pattern:


```python
# API Gateway (Django)
from django.http import JsonResponse
import requests

class APIGateway:
def __init__(self):
self.services = {
'users': 'http://user-service:8001',
'products': 'http://product-service:8002',
'orders': 'http://order-service:8003'
}

def route_request(self, service_name, path, method='GET', data=None):


if service_name not in self.services:
return JsonResponse({'error': 'Service not found'}, status=404)

url = f"{self.services[service_name]}{path}"
try:
if method == 'GET':
response = requests.get(url)
elif method == 'POST':
response = requests.post(url, json=data)
return JsonResponse(response.json(), status=response.status_code)
except requests.RequestException:
return JsonResponse({'error': 'Service unavailable'}, status=503)

# Usage in Django view


def user_profile(request, user_id):
gateway = APIGateway()
return gateway.route_request('users', f'/api/users/{user_id}/')
```

STEP-BY-STEP MICROSERVICE IMPLEMENTATION

3. SERVICE DECOMPOSITION STRATEGY


Q: How do you decompose a monolithic Django application into microservices?
A:
- Identify bounded contexts (business domains)
- Extract data models and business logic
- Create service interfaces
- Implement data consistency strategies
- Handle cross-service communication

Example - Decomposing User Service:


```python
# Original Monolithic User Model
class User(models.Model):
username = models.CharField(max_length=100)
email = models.EmailField()
password = models.CharField(max_length=128)
profile = models.OneToOneField('UserProfile', on_delete=models.CASCADE)
orders = models.ManyToManyField('Order')
payments = models.ManyToManyField('Payment')

# Decomposed User Service


# user_service/models.py
class User(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
username = models.CharField(max_length=100, unique=True)
email = models.EmailField(unique=True)
password = models.CharField(max_length=128)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
first_name = models.CharField(max_length=100)
last_name = models.CharField(max_length=100)
phone = models.CharField(max_length=20)
address = models.TextField()

# user_service/serializers.py
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'username', 'email', 'created_at']
read_only_fields = ['id', 'created_at']

class UserProfileSerializer(serializers.ModelSerializer):
class Meta:
model = UserProfile
fields = '__all__'

# user_service/views.py
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]

def create(self, request):


serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
# Hash password
user.set_password(request.data['password'])
user.save()
return Response(serializer.data, status=201)
return Response(serializer.errors, status=400)

def retrieve(self, request, pk=None):


try:
user = User.objects.get(pk=pk)
serializer = self.get_serializer(user)
return Response(serializer.data)
except User.DoesNotExist:
return Response({'error': 'User not found'}, status=404)
```

4. INTER-SERVICE COMMUNICATION
Q: How do you implement communication between microservices?
A:
- HTTP REST APIs for synchronous communication
- Message queues (RabbitMQ, Redis) for asynchronous communication
- gRPC for high-performance communication
- Event-driven patterns for loose coupling

Example - HTTP Communication:


```python
# user_service/services.py
import requests
from django.conf import settings

class OrderServiceClient:
def __init__(self):
self.base_url = settings.ORDER_SERVICE_URL
def get_user_orders(self, user_id):
try:
response = requests.get(f"{self.base_url}/api/orders/user/{user_id}/")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Failed to fetch orders for user {user_id}: {e}")
return []

def create_order(self, order_data):


try:
response = requests.post(f"{self.base_url}/api/orders/",
json=order_data)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Failed to create order: {e}")
raise

# user_service/views.py
class UserDetailView(APIView):
def get(self, request, user_id):
# Get user data
try:
user = User.objects.get(pk=user_id)
user_data = UserSerializer(user).data

# Get user orders from order service


order_client = OrderServiceClient()
orders = order_client.get_user_orders(user_id)

# Combine data
response_data = {
'user': user_data,
'orders': orders
}
return Response(response_data)
except User.DoesNotExist:
return Response({'error': 'User not found'}, status=404)
```

Example - Message Queue Communication:


```python
# user_service/tasks.py
import json
from celery import Celery
from django.conf import settings

app = Celery('user_service')
app.config_from_object('django.conf:settings', namespace='CELERY')

@app.task
def notify_user_created(user_data):
"""Notify other services when a user is created"""
message = {
'event_type': 'user.created',
'user_id': user_data['id'],
'email': user_data['email'],
'timestamp': user_data['created_at']
}

# Publish to message queue


app.send_task('notification_service.send_welcome_email', args=[message])
app.send_task('analytics_service.track_user_signup', args=[message])

# user_service/views.py
class UserViewSet(viewsets.ModelViewSet):
def create(self, request):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
user.set_password(request.data['password'])
user.save()

# Notify other services


user_data = serializer.data
notify_user_created.delay(user_data)

return Response(serializer.data, status=201)


return Response(serializer.errors, status=400)
```

5. SERVICE DISCOVERY AND LOAD BALANCING


Q: How do you implement service discovery and load balancing in microservices?
A:
- Use service registries (Consul, Eureka, etcd)
- Implement health checks
- Use load balancers (Nginx, HAProxy)
- Implement client-side load balancing

Example - Service Discovery with Consul:


```python
# user_service/discovery.py
import consul
import requests
from django.conf import settings

class ServiceDiscovery:
def __init__(self):
self.consul_client = consul.Consul(
host=settings.CONSUL_HOST,
port=settings.CONSUL_PORT
)

def register_service(self, service_name, service_id, address, port):


"""Register this service with Consul"""
self.consul_client.agent.service.register(
name=service_name,
service_id=service_id,
address=address,
port=port,
check=consul.Check.http(f"http://{address}:{port}/health/",
interval="10s")
)

def discover_service(self, service_name):


"""Discover service instances"""
_, services = self.consul_client.health.service(service_name, passing=True)
return [service['Service'] for service in services]

def get_service_url(self, service_name):


"""Get a service URL for making requests"""
services = self.discover_service(service_name)
if not services:
raise Exception(f"No instances found for service: {service_name}")

# Simple round-robin (in production, use proper load balancing)


service = services[0]
return f"http://{service['Address']}:{service['Port']}"

# user_service/apps.py
from django.apps import AppConfig
from django.conf import settings

class UserServiceConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'user_service'

def ready(self):
if not settings.DEBUG:
# Register service with Consul
from .discovery import ServiceDiscovery
discovery = ServiceDiscovery()
discovery.register_service(
'user-service',
'user-service-1',
settings.SERVICE_HOST,
settings.SERVICE_PORT
)

# user_service/health.py
from django.http import JsonResponse
from django.db import connection

def health_check(request):
"""Health check endpoint for service discovery"""
try:
# Check database connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")

return JsonResponse({
'status': 'healthy',
'service': 'user-service',
'database': 'connected'
})
except Exception as e:
return JsonResponse({
'status': 'unhealthy',
'service': 'user-service',
'error': str(e)
}, status=503)
```

6. CIRCUIT BREAKER PATTERN


Q: How do you implement the circuit breaker pattern for fault tolerance?
A:
- Monitor service calls and track failures
- Open circuit when failure threshold is reached
- Provide fallback responses
- Automatically close circuit after timeout

Example - Circuit Breaker Implementation:


```python
# user_service/circuit_breaker.py
import time
from enum import Enum
from functools import wraps

class CircuitState(Enum):
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"

class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60,
expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED

def call(self, func, *args, **kwargs):


if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")

try:
result = func(*args, **kwargs)
self.on_success()
return result
except self.expected_exception as e:
self.on_failure()
raise e

def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED

def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:


self.state = CircuitState.OPEN

def circuit_breaker(failure_threshold=5, recovery_timeout=60):


def decorator(func):
breaker = CircuitBreaker(failure_threshold, recovery_timeout)

@wraps(func)
def wrapper(*args, **kwargs):
return breaker.call(func, *args, **kwargs)

return wrapper
return decorator

# user_service/services.py
class OrderServiceClient:
def __init__(self):
self.base_url = settings.ORDER_SERVICE_URL

@circuit_breaker(failure_threshold=3, recovery_timeout=30)
def get_user_orders(self, user_id):
response = requests.get(f"{self.base_url}/api/orders/user/{user_id}/",
timeout=5)
response.raise_for_status()
return response.json()

def get_user_orders_with_fallback(self, user_id):


try:
return self.get_user_orders(user_id)
except Exception as e:
# Fallback: return cached data or empty list
logger.warning(f"Order service unavailable, using fallback: {e}")
return []
```

7. DISTRIBUTED TRACING
Q: How do you implement distributed tracing across microservices?
A:
- Use correlation IDs to track requests across services
- Implement structured logging
- Use tracing tools (Jaeger, Zipkin)
- Add trace context to all service calls

Example - Distributed Tracing:


```python
# user_service/middleware.py
import uuid
import logging
from django.utils.deprecation import MiddlewareMixin

class TracingMiddleware(MiddlewareMixin):
def process_request(self, request):
# Get or create correlation ID
correlation_id = request.headers.get('X-Correlation-ID')
if not correlation_id:
correlation_id = str(uuid.uuid4())

# Add to request
request.correlation_id = correlation_id

# Add to response headers


request.META['HTTP_X_CORRELATION_ID'] = correlation_id

# Log request
logger = logging.getLogger(__name__)
logger.info(f"Request started", extra={
'correlation_id': correlation_id,
'method': request.method,
'path': request.path,
'service': 'user-service'
})

def process_response(self, request, response):


# Add correlation ID to response headers
response['X-Correlation-ID'] = getattr(request, 'correlation_id', '')

# Log response
logger = logging.getLogger(__name__)
logger.info(f"Request completed", extra={
'correlation_id': getattr(request, 'correlation_id', ''),
'status_code': response.status_code,
'service': 'user-service'
})

return response

# user_service/services.py
class OrderServiceClient:
def get_user_orders(self, user_id, correlation_id=None):
headers = {}
if correlation_id:
headers['X-Correlation-ID'] = correlation_id

logger = logging.getLogger(__name__)
logger.info(f"Calling order service", extra={
'correlation_id': correlation_id,
'user_id': user_id,
'service': 'user-service',
'target_service': 'order-service'
})

try:
response = requests.get(
f"{self.base_url}/api/orders/user/{user_id}/",
headers=headers,
timeout=5
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Order service call failed", extra={
'correlation_id': correlation_id,
'error': str(e),
'service': 'user-service'
})
raise

# user_service/views.py
class UserDetailView(APIView):
def get(self, request, user_id):
correlation_id = getattr(request, 'correlation_id', None)

try:
user = User.objects.get(pk=user_id)
user_data = UserSerializer(user).data

# Pass correlation ID to other services


order_client = OrderServiceClient()
orders = order_client.get_user_orders(user_id, correlation_id)

response_data = {
'user': user_data,
'orders': orders
}
return Response(response_data)
except User.DoesNotExist:
return Response({'error': 'User not found'}, status=404)
```

8. DATA CONSISTENCY PATTERNS


Q: How do you handle data consistency across microservices?
A:
- Use Saga pattern for distributed transactions
- Implement eventual consistency
- Use event sourcing for audit trails
- Implement compensating transactions

Example - Saga Pattern:


```python
# user_service/sagas.py
from enum import Enum
import logging

class SagaStatus(Enum):
STARTED = "STARTED"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
COMPENSATING = "COMPENSATING"

class UserRegistrationSaga:
def __init__(self, user_data):
self.user_data = user_data
self.status = SagaStatus.STARTED
self.steps = []
self.logger = logging.getLogger(__name__)

def execute(self):
"""Execute the saga steps"""
try:
# Step 1: Create user
user = self.create_user()
self.steps.append(('create_user', user.id))

# Step 2: Create user profile


profile = self.create_user_profile(user.id)
self.steps.append(('create_profile', profile.id))

# Step 3: Send welcome email


self.send_welcome_email(user.email)
self.steps.append(('send_email', user.email))

# Step 4: Create analytics record


self.create_analytics_record(user.id)
self.steps.append(('analytics', user.id))

self.status = SagaStatus.COMPLETED
return user

except Exception as e:
self.logger.error(f"Saga failed: {e}")
self.compensate()
raise

def compensate(self):
"""Compensate for failed steps"""
self.status = SagaStatus.COMPENSATING

# Execute compensation in reverse order


for step_name, step_data in reversed(self.steps):
try:
if step_name == 'create_user':
self.delete_user(step_data)
elif step_name == 'create_profile':
self.delete_user_profile(step_data)
elif step_name == 'send_email':
self.send_apology_email(step_data)
elif step_name == 'analytics':
self.delete_analytics_record(step_data)
except Exception as e:
self.logger.error(f"Compensation failed for {step_name}: {e}")

self.status = SagaStatus.FAILED

def create_user(self):
# Create user in local database
user = User.objects.create(
username=self.user_data['username'],
email=self.user_data['email']
)
user.set_password(self.user_data['password'])
user.save()
return user

def create_user_profile(self, user_id):


# Create profile via API call
profile_data = {
'user_id': user_id,
'first_name': self.user_data.get('first_name', ''),
'last_name': self.user_data.get('last_name', '')
}

response = requests.post(
f"{settings.PROFILE_SERVICE_URL}/api/profiles/",
json=profile_data
)
response.raise_for_status()
return response.json()

def send_welcome_email(self, email):


# Send email via notification service
email_data = {
'to': email,
'template': 'welcome',
'context': {'username': self.user_data['username']}
}
response = requests.post(
f"{settings.NOTIFICATION_SERVICE_URL}/api/emails/",
json=email_data
)
response.raise_for_status()

def create_analytics_record(self, user_id):


# Create analytics record
analytics_data = {
'user_id': user_id,
'event_type': 'user_registered',
'timestamp': timezone.now().isoformat()
}

response = requests.post(
f"{settings.ANALYTICS_SERVICE_URL}/api/events/",
json=analytics_data
)
response.raise_for_status()

# Compensation methods
def delete_user(self, user_id):
User.objects.filter(id=user_id).delete()

def delete_user_profile(self, profile_id):

requests.delete(f"{settings.PROFILE_SERVICE_URL}/api/profiles/{profile_id}/")

def send_apology_email(self, email):


email_data = {
'to': email,
'template': 'registration_failed',
'context': {}
}
requests.post(f"{settings.NOTIFICATION_SERVICE_URL}/api/emails/",
json=email_data)

def delete_analytics_record(self, user_id):


requests.delete(f"{settings.ANALYTICS_SERVICE_URL}/api/events/user/
{user_id}/")

# user_service/views.py
class UserRegistrationView(APIView):
def post(self, request):
try:
# Start saga
saga = UserRegistrationSaga(request.data)
user = saga.execute()

return Response({
'message': 'User registered successfully',
'user_id': user.id
}, status=201)

except Exception as e:
return Response({
'error': 'Registration failed',
'details': str(e)
}, status=500)
```

PRACTICAL EXERCISES:

1. Complete User Service Implementation:


```python
# user_service/settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'corsheaders',
'user_service',
]

# Service configuration
SERVICE_NAME = 'user-service'
SERVICE_HOST = '0.0.0.0'
SERVICE_PORT = 8001

# Other services
ORDER_SERVICE_URL = 'http://order-service:8003'
PROFILE_SERVICE_URL = 'http://profile-service:8004'
NOTIFICATION_SERVICE_URL = 'http://notification-service:8005'

# Consul configuration
CONSUL_HOST = 'consul'
CONSUL_PORT = 8500

# Celery configuration
CELERY_BROKER_URL = 'redis://redis:6379/0'
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'

# user_service/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import UserViewSet, UserDetailView, UserRegistrationView
from .health import health_check

router = DefaultRouter()
router.register(r'users', UserViewSet)

urlpatterns = [
path('api/', include(router.urls)),
path('api/users/<uuid:user_id>/detail/', UserDetailView.as_view()),
path('api/users/register/', UserRegistrationView.as_view()),
path('health/', health_check),
]
```

2. Docker Configuration:
```dockerfile
# user_service/Dockerfile
FROM python:3.9-slim
WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8001

CMD ["gunicorn", "--bind", "0.0.0.0:8001", "user_service.wsgi:application"]


```

```yaml
# docker-compose.yml
version: '3.8'

services:
user-service:
build: ./user-service
ports:
- "8001:8001"
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/user_service
- REDIS_URL=redis://redis:6379/0
depends_on:
- postgres
- redis
- consul

order-service:
build: ./order-service
ports:
- "8003:8003"
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/order_service

profile-service:
build: ./profile-service
ports:
- "8004:8004"
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/profile_service

notification-service:
build: ./notification-service
ports:
- "8005:8005"
environment:
- SMTP_HOST=smtp.gmail.com
- SMTP_PORT=587

postgres:
image: postgres:13
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=user_service
volumes:
- postgres_data:/var/lib/postgresql/data

redis:
image: redis:6-alpine

consul:
image: consul:1.11
ports:
- "8500:8500"
command: agent -server -bootstrap-expect=1 -ui -client=0.0.0.0

volumes:
postgres_data:
```

3. API Gateway Implementation:


```python
# gateway/settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'corsheaders',
'gateway',
]

# gateway/views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import requests
import json

class APIGateway:
def __init__(self):
self.services = {
'users': 'http://user-service:8001',
'orders': 'http://order-service:8003',
'products': 'http://product-service:8002',
'payments': 'http://payment-service:8004'
}

def route_request(self, service_name, path, method='GET', data=None,


headers=None):
if service_name not in self.services:
return JsonResponse({'error': 'Service not found'}, status=404)

url = f"{self.services[service_name]}{path}"

try:
if method == 'GET':
response = requests.get(url, headers=headers, timeout=10)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers,
timeout=10)
elif method == 'PUT':
response = requests.put(url, json=data, headers=headers,
timeout=10)
elif method == 'DELETE':
response = requests.delete(url, headers=headers, timeout=10)

return JsonResponse(response.json(), status=response.status_code)


except requests.RequestException as e:
return JsonResponse({'error': 'Service unavailable'}, status=503)

gateway = APIGateway()

@csrf_exempt
@require_http_methods(["GET", "POST", "PUT", "DELETE"])
def proxy_view(request, service_name, path):
"""Proxy requests to appropriate microservices"""
method = request.method
data = None

if method in ['POST', 'PUT']:


data = json.loads(request.body) if request.body else None

headers = dict(request.headers)
# Remove Django-specific headers
headers.pop('Content-Length', None)
headers.pop('Host', None)

return gateway.route_request(service_name, f'/{path}', method, data, headers)

# gateway/urls.py
from django.urls import path, re_path
from .views import proxy_view

urlpatterns = [
re_path(r'^api/(?P<service_name>\w+)/(?P<path>.*)$', proxy_view),
]
```

PERFORMANCE TIPS:
- Use connection pooling for database connections
- Implement caching at multiple levels (Redis, CDN)
- Use async/await for I/O-bound operations
- Implement proper error handling and retries
- Monitor service health and performance
- Use load balancing for high availability
- Implement rate limiting and throttling
- Use message queues for asynchronous processing
- Implement proper logging and monitoring
- Use container orchestration (Kubernetes) for production

You might also like