# models.
py
from django.db import models
from django.contrib.postgres.fields import JSONField
from django.utils import timezone
class LogEntry(models.Model):
LOG_TYPES = [
('traffic', 'Traffic'),
('event', 'Event'),
('system', 'System'),
('security', 'Security'),
]
LOG_LEVELS = [
('emergency', 'Emergency'),
('alert', 'Alert'),
('critical', 'Critical'),
('error', 'Error'),
('warning', 'Warning'),
('notice', 'Notice'),
('information', 'Information'),
('debug', 'Debug'),
]
# Core fields
eventtime = models.BigIntegerField()
timezone = models.CharField(max_length=10, default="+0700")
logid = models.CharField(max_length=50)
log_type = models.CharField(max_length=20, choices=LOG_TYPES)
subtype = models.CharField(max_length=50, blank=True, null=True)
level = models.CharField(max_length=20, choices=LOG_LEVELS)
vd = models.CharField(max_length=50, default="root")
# Network fields (for traffic logs)
srcip = models.GenericIPAddressField(blank=True, null=True)
srcport = models.IntegerField(blank=True, null=True)
srcintf = models.CharField(max_length=50, blank=True, null=True)
srcintfrole = models.CharField(max_length=50, blank=True, null=True)
dstip = models.GenericIPAddressField(blank=True, null=True)
dstport = models.IntegerField(blank=True, null=True)
dstintf = models.CharField(max_length=50, blank=True, null=True)
dstintfrole = models.CharField(max_length=50, blank=True, null=True)
# Location fields
srccountry = models.CharField(max_length=100, blank=True, null=True)
dstcountry = models.CharField(max_length=100, blank=True, null=True)
# Session and protocol fields
sessionid = models.BigIntegerField(blank=True, null=True)
proto = models.IntegerField(blank=True, null=True)
action = models.CharField(max_length=50, blank=True, null=True)
policyid = models.IntegerField(blank=True, null=True)
service = models.CharField(max_length=100, blank=True, null=True)
# Traffic statistics
duration = models.IntegerField(blank=True, null=True)
sentbyte = models.BigIntegerField(blank=True, null=True)
rcvdbyte = models.BigIntegerField(blank=True, null=True)
sentpkt = models.IntegerField(blank=True, null=True)
rcvdpkt = models.IntegerField(blank=True, null=True)
# VPN specific fields
tunneltype = models.CharField(max_length=50, blank=True, null=True)
tunnelid = models.BigIntegerField(blank=True, null=True)
remip = models.GenericIPAddressField(blank=True, null=True)
tunnelip = models.GenericIPAddressField(blank=True, null=True)
user = models.CharField(max_length=100, blank=True, null=True)
group = models.CharField(max_length=100, blank=True, null=True)
dst_host = models.CharField(max_length=255, blank=True, null=True)
reason = models.CharField(max_length=255, blank=True, null=True)
msg = models.TextField(blank=True, null=True)
# Additional fields
app = models.CharField(max_length=100, blank=True, null=True)
appcat = models.CharField(max_length=100, blank=True, null=True)
trandisp = models.CharField(max_length=50, blank=True, null=True)
logdesc = models.CharField(max_length=255, blank=True, null=True)
# Store raw log data and parsed fields
raw_log = models.TextField()
parsed_fields = JSONField(default=dict, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(blank=True, null=True)
class Meta:
db_table = 'log_entries'
indexes = [
models.Index(fields=['eventtime']),
models.Index(fields=['log_type', 'subtype']),
models.Index(fields=['srcip']),
models.Index(fields=['dstip']),
models.Index(fields=['user']),
models.Index(fields=['created_at']),
]
def __str__(self):
return f"{self.logid} - {self.log_type} - {self.level}"
class LogProcessingStatus(models.Model):
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
]
batch_id = models.CharField(max_length=100, unique=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES,
default='pending')
total_logs = models.IntegerField(default=0)
processed_logs = models.IntegerField(default=0)
failed_logs = models.IntegerField(default=0)
error_message = models.TextField(blank=True, null=True)
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(blank=True, null=True)
def __str__(self):
return f"{self.batch_id} - {self.status}"
# parsers.py
import re
import json
from datetime import datetime
from typing import Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class LogParser:
"""Base log parser class"""
@staticmethod
def parse_log_line(log_line: str) -> Dict[str, Any]:
"""Parse a single log line into key-value pairs"""
parsed_data = {}
# Remove newlines and extra spaces
log_line = log_line.strip()
# Pattern to match key=value pairs, handling quoted values
pattern = r'(\w+)=(?:"([^"]*)"|(\S+))'
matches = re.findall(pattern, log_line)
for match in matches:
key = match[0]
value = match[1] if match[1] else match[2]
# Convert numeric values
if value.isdigit():
parsed_data[key] = int(value)
elif key == 'eventtime' and value.isdigit():
parsed_data[key] = int(value)
else:
parsed_data[key] = value
# Store original log line
parsed_data['raw_log'] = log_line
return parsed_data
@staticmethod
def convert_eventtime(eventtime: int) -> datetime:
"""Convert eventtime to datetime object"""
try:
# Convert nanoseconds to seconds
timestamp = eventtime / 1_000_000_000
return datetime.fromtimestamp(timestamp)
except (ValueError, TypeError):
return datetime.now()
class TrafficLogParser(LogParser):
"""Parser specifically for traffic logs"""
@classmethod
def parse(cls, log_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse traffic log specific fields"""
parsed = {}
# Map common fields
field_mapping = {
'eventtime': 'eventtime',
'tz': 'timezone',
'logid': 'logid',
'type': 'log_type',
'subtype': 'subtype',
'level': 'level',
'vd': 'vd',
'srcip': 'srcip',
'srcport': 'srcport',
'srcintf': 'srcintf',
'srcintfrole': 'srcintfrole',
'dstip': 'dstip',
'dstport': 'dstport',
'dstintf': 'dstintf',
'dstintfrole': 'dstintfrole',
'srccountry': 'srccountry',
'dstcountry': 'dstcountry',
'sessionid': 'sessionid',
'proto': 'proto',
'action': 'action',
'policyid': 'policyid',
'service': 'service',
'trandisp': 'trandisp',
'app': 'app',
'duration': 'duration',
'sentbyte': 'sentbyte',
'rcvdbyte': 'rcvdbyte',
'sentpkt': 'sentpkt',
'rcvdpkt': 'rcvdpkt',
'appcat': 'appcat',
}
for source_key, target_key in field_mapping.items():
if source_key in log_data:
parsed[target_key] = log_data[source_key]
return parsed
class VPNLogParser(LogParser):
"""Parser specifically for VPN logs"""
@classmethod
def parse(cls, log_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse VPN log specific fields"""
parsed = {}
# Map VPN specific fields
field_mapping = {
'eventtime': 'eventtime',
'tz': 'timezone',
'logid': 'logid',
'type': 'log_type',
'subtype': 'subtype',
'level': 'level',
'vd': 'vd',
'logdesc': 'logdesc',
'action': 'action',
'tunneltype': 'tunneltype',
'tunnelid': 'tunnelid',
'remip': 'remip',
'tunnelip': 'tunnelip',
'user': 'user',
'group': 'group',
'dst_host': 'dst_host',
'reason': 'reason',
'msg': 'msg',
}
for source_key, target_key in field_mapping.items():
if source_key in log_data:
parsed[target_key] = log_data[source_key]
return parsed
# graylog_client.py
import requests
import json
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class GraylogClient:
"""Client for interacting with Graylog API"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.session = requests.Session()
self.session.auth = (username, password)
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})
def search_logs(self,
query: str = "*",
time_range: int = 300,
limit: int = 100,
offset: int = 0) -> Optional[Dict[str, Any]]:
"""Search logs from Graylog"""
try:
url = f"{self.base_url}/api/search/universal/relative"
params = {
'query': query,
'range': time_range,
'limit': limit,
'offset': offset,
'sort': 'timestamp:desc'
}
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching logs from Graylog: {e}")
return None
def get_recent_logs(self, minutes_back: int = 5) -> List[Dict[str, Any]]:
"""Get logs from the last N minutes"""
try:
result = self.search_logs(
query="*",
time_range=minutes_back * 60,
limit=1000
)
if result and 'messages' in result:
return [msg['message'] for msg in result['messages']]
return []
except Exception as e:
logger.error(f"Error getting recent logs: {e}")
return []
def stream_logs(self, callback_func, interval: int = 30):
"""Stream logs continuously"""
import time
last_timestamp = datetime.now()
while True:
try:
# Get logs since last check
logs = self.get_recent_logs(minutes_back=1)
if logs:
callback_func(logs)
last_timestamp = datetime.now()
time.sleep(interval)
except KeyboardInterrupt:
logger.info("Log streaming stopped by user")
break
except Exception as e:
logger.error(f"Error in log streaming: {e}")
time.sleep(interval)
# tasks.py
from celery import shared_task
from django.utils import timezone
from django.db import transaction
from .models import LogEntry, LogProcessingStatus
from .parsers import LogParser, TrafficLogParser, VPNLogParser
from .graylog_client import GraylogClient
from django.conf import settings
import uuid
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True)
def fetch_and_process_logs(self, minutes_back=5):
"""Fetch logs from Graylog and process them"""
batch_id = str(uuid.uuid4())
# Create processing status record
status = LogProcessingStatus.objects.create(
batch_id=batch_id,
status='processing'
)
try:
# Initialize Graylog client
client = GraylogClient(
base_url=settings.GRAYLOG_URL,
username=settings.GRAYLOG_USERNAME,
password=settings.GRAYLOG_PASSWORD
)
# Fetch recent logs
logs = client.get_recent_logs(minutes_back=minutes_back)
status.total_logs = len(logs)
status.save()
processed_count = 0
failed_count = 0
for log_data in logs:
try:
# Process individual log entry
success = process_single_log.delay(log_data, batch_id)
if success:
processed_count += 1
else:
failed_count += 1
except Exception as e:
logger.error(f"Error processing log: {e}")
failed_count += 1
# Update status
status.processed_logs = processed_count
status.failed_logs = failed_count
status.status = 'completed'
status.completed_at = timezone.now()
status.save()
return {
'batch_id': batch_id,
'total': len(logs),
'processed': processed_count,
'failed': failed_count
}
except Exception as e:
logger.error(f"Error in fetch_and_process_logs: {e}")
status.status = 'failed'
status.error_message = str(e)
status.completed_at = timezone.now()
status.save()
raise
@shared_task
def process_single_log(log_data, batch_id):
"""Process a single log entry"""
try:
# Parse the raw log line if it's a string
if isinstance(log_data, str):
parsed_data = LogParser.parse_log_line(log_data)
else:
parsed_data = log_data
# Determine log type and use appropriate parser
log_type = parsed_data.get('type', 'unknown')
if log_type == 'traffic':
processed_data = TrafficLogParser.parse(parsed_data)
elif log_type == 'event' and parsed_data.get('subtype') == 'vpn':
processed_data = VPNLogParser.parse(parsed_data)
else:
# Generic parsing
processed_data = parsed_data
# Create LogEntry
with transaction.atomic():
log_entry = LogEntry()
# Map fields to model
for field_name, value in processed_data.items():
if hasattr(log_entry, field_name):
setattr(log_entry, field_name, value)
# Set required fields with defaults
if not hasattr(log_entry, 'raw_log') or not log_entry.raw_log:
log_entry.raw_log = str(log_data)
log_entry.parsed_fields = parsed_data
log_entry.processed_at = timezone.now()
log_entry.save()
logger.info(f"Successfully processed log entry: {log_entry.logid}")
return True
except Exception as e:
logger.error(f"Error processing single log: {e}")
return False
@shared_task
def start_real_time_processing():
"""Start real-time log processing from Graylog"""
def process_logs_callback(logs):
"""Callback function for processing streamed logs"""
for log in logs:
process_single_log.delay(log, f"realtime_{timezone.now().isoformat()}")
try:
client = GraylogClient(
base_url=settings.GRAYLOG_URL,
username=settings.GRAYLOG_USERNAME,
password=settings.GRAYLOG_PASSWORD
)
# Start streaming logs
client.stream_logs(process_logs_callback, interval=30)
except Exception as e:
logger.error(f"Error in real-time processing: {e}")
raise
# management/commands/process_graylog.py
from django.core.management.base import BaseCommand
from django.conf import settings
from myapp.tasks import fetch_and_process_logs, start_real_time_processing
class Command(BaseCommand):
help = 'Process Graylog logs'
def add_arguments(self, parser):
parser.add_argument(
'--mode',
type=str,
choices=['batch', 'realtime'],
default='batch',
help='Processing mode: batch or realtime'
)
parser.add_argument(
'--minutes',
type=int,
default=5,
help='Minutes back to fetch logs (batch mode only)'
)
def handle(self, *args, **options):
mode = options['mode']
if mode == 'batch':
minutes_back = options['minutes']
self.stdout.write(f'Starting batch processing for last {minutes_back}
minutes...')
result = fetch_and_process_logs.delay(minutes_back)
self.stdout.write(f'Task queued: {result.id}')
elif mode == 'realtime':
self.stdout.write('Starting real-time processing...')
start_real_time_processing.delay()
self.stdout.write('Real-time processing started')
# settings.py additions
"""
Add these to your Django settings.py:
# Graylog Configuration
GRAYLOG_URL = 'http://your-graylog-server:9000'
GRAYLOG_USERNAME = 'your-username'
GRAYLOG_PASSWORD = 'your-password'
# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TIMEZONE = 'Asia/Ho_Chi_Minh'
# Logging Configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': 'graylog_processing.log',
},
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
},
},
'loggers': {
'myapp': {
'handlers': ['file', 'console'],
'level': 'INFO',
'propagate': True,
},
},
}
"""
# celery.py
"""
Create this file in your project root:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
"""
# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from .tasks import fetch_and_process_logs
from .models import LogProcessingStatus, LogEntry
import json
@csrf_exempt
@require_http_methods(["POST"])
def trigger_log_processing(request):
"""API endpoint to trigger log processing"""
try:
data = json.loads(request.body)
minutes_back = data.get('minutes_back', 5)
# Queue the processing task
task = fetch_and_process_logs.delay(minutes_back)
return JsonResponse({
'status': 'success',
'task_id': task.id,
'message': f'Log processing queued for last {minutes_back} minutes'
})
except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, status=500)
@require_http_methods(["GET"])
def processing_status(request, batch_id):
"""Get processing status for a batch"""
try:
status = LogProcessingStatus.objects.get(batch_id=batch_id)
return JsonResponse({
'batch_id': status.batch_id,
'status': status.status,
'total_logs': status.total_logs,
'processed_logs': status.processed_logs,
'failed_logs': status.failed_logs,
'started_at': status.started_at.isoformat(),
'completed_at': status.completed_at.isoformat() if status.completed_at
else None,
'error_message': status.error_message
})
except LogProcessingStatus.DoesNotExist:
return JsonResponse({'error': 'Batch not found'}, status=404)
@require_http_methods(["GET"])
def log_stats(request):
"""Get log statistics"""
from django.db.models import Count
stats = LogEntry.objects.aggregate(
total_logs=Count('id'),
traffic_logs=Count('id', filter=models.Q(log_type='traffic')),
event_logs=Count('id', filter=models.Q(log_type='event')),
)
recent_logs = LogEntry.objects.filter(
created_at__gte=timezone.now() - timedelta(hours=24)
).count()
stats['recent_24h'] = recent_logs
return JsonResponse(stats)
#fix view.py
from django.views.decorators.http import require_http_methods
from django.http import JsonResponse
from django.db import models
from django.utils import timezone
from datetime import timedelta
@require_http_methods(["GET"])
def log_stats(request):
"""Get log statistics"""
from django.db.models import Count
stats = LogEntry.objects.aggregate(
total_logs=Count('id'),
traffic_logs=Count('id', filter=models.Q(log_type='traffic')),
event_logs=Count('id', filter=models.Q(log_type='event')),
)
recent_logs = LogEntry.objects.filter(
created_at__gte=timezone.now() - timedelta(hours=24)
).count()
stats['recent_24h'] = recent_logs
return JsonResponse(stats)
# Usage Instructions:
"""
1. Install dependencies:
pip install celery redis requests psycopg2-binary
2. Run migrations:
python manage.py makemigrations
python manage.py migrate
3. Start Celery worker:
celery -A your_project worker -l info
4. Start Celery beat (for periodic tasks):
celery -A your_project beat -l info
5. Process logs:
# Batch processing
python manage.py process_graylog --mode batch --minutes 10
# Real-time processing
python manage.py process_graylog --mode realtime
6. API Usage:
# Trigger processing
curl -X POST http://localhost:8000/api/trigger-processing/ \
-H "Content-Type: application/json" \
-d '{"minutes_back": 10}'
# Check status
curl http://localhost:8000/api/status/{batch_id}/
# Get stats
curl http://localhost:8000/api/stats/
"""
bash# Batch processing last 10 minutes
python manage.py process_graylog --mode batch --minutes 10
# Start real-time processing
python manage.py process_graylog --mode realtime
# API trigger
curl -X POST /api/trigger-processing/ -d '{"minutes_back": 5}'