0% found this document useful (0 votes)
14 views13 pages

Log Processing

The document outlines a Django application that processes logs from Graylog, featuring models for log entries and processing status, as well as parsers for different log types. It includes a Graylog client for fetching logs, Celery tasks for processing logs in batch or real-time, and a management command for initiating the log processing. The application is designed to handle various log types, including traffic and VPN logs, and stores both raw and parsed log data in a database.

Uploaded by

duy.salome
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
14 views13 pages

Log Processing

The document outlines a Django application that processes logs from Graylog, featuring models for log entries and processing status, as well as parsers for different log types. It includes a Graylog client for fetching logs, Celery tasks for processing logs in batch or real-time, and a management command for initiating the log processing. The application is designed to handle various log types, including traffic and VPN logs, and stores both raw and parsed log data in a database.

Uploaded by

duy.salome
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 13

# models.

py
from django.db import models
from django.contrib.postgres.fields import JSONField
from django.utils import timezone

class LogEntry(models.Model):
LOG_TYPES = [
('traffic', 'Traffic'),
('event', 'Event'),
('system', 'System'),
('security', 'Security'),
]

LOG_LEVELS = [
('emergency', 'Emergency'),
('alert', 'Alert'),
('critical', 'Critical'),
('error', 'Error'),
('warning', 'Warning'),
('notice', 'Notice'),
('information', 'Information'),
('debug', 'Debug'),
]

# Core fields
eventtime = models.BigIntegerField()
timezone = models.CharField(max_length=10, default="+0700")
logid = models.CharField(max_length=50)
log_type = models.CharField(max_length=20, choices=LOG_TYPES)
subtype = models.CharField(max_length=50, blank=True, null=True)
level = models.CharField(max_length=20, choices=LOG_LEVELS)
vd = models.CharField(max_length=50, default="root")

# Network fields (for traffic logs)


srcip = models.GenericIPAddressField(blank=True, null=True)
srcport = models.IntegerField(blank=True, null=True)
srcintf = models.CharField(max_length=50, blank=True, null=True)
srcintfrole = models.CharField(max_length=50, blank=True, null=True)
dstip = models.GenericIPAddressField(blank=True, null=True)
dstport = models.IntegerField(blank=True, null=True)
dstintf = models.CharField(max_length=50, blank=True, null=True)
dstintfrole = models.CharField(max_length=50, blank=True, null=True)

# Location fields
srccountry = models.CharField(max_length=100, blank=True, null=True)
dstcountry = models.CharField(max_length=100, blank=True, null=True)

# Session and protocol fields


sessionid = models.BigIntegerField(blank=True, null=True)
proto = models.IntegerField(blank=True, null=True)
action = models.CharField(max_length=50, blank=True, null=True)
policyid = models.IntegerField(blank=True, null=True)
service = models.CharField(max_length=100, blank=True, null=True)

# Traffic statistics
duration = models.IntegerField(blank=True, null=True)
sentbyte = models.BigIntegerField(blank=True, null=True)
rcvdbyte = models.BigIntegerField(blank=True, null=True)
sentpkt = models.IntegerField(blank=True, null=True)
rcvdpkt = models.IntegerField(blank=True, null=True)

# VPN specific fields


tunneltype = models.CharField(max_length=50, blank=True, null=True)
tunnelid = models.BigIntegerField(blank=True, null=True)
remip = models.GenericIPAddressField(blank=True, null=True)
tunnelip = models.GenericIPAddressField(blank=True, null=True)
user = models.CharField(max_length=100, blank=True, null=True)
group = models.CharField(max_length=100, blank=True, null=True)
dst_host = models.CharField(max_length=255, blank=True, null=True)
reason = models.CharField(max_length=255, blank=True, null=True)
msg = models.TextField(blank=True, null=True)

# Additional fields
app = models.CharField(max_length=100, blank=True, null=True)
appcat = models.CharField(max_length=100, blank=True, null=True)
trandisp = models.CharField(max_length=50, blank=True, null=True)
logdesc = models.CharField(max_length=255, blank=True, null=True)

# Store raw log data and parsed fields


raw_log = models.TextField()
parsed_fields = JSONField(default=dict, blank=True)

# Metadata
created_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(blank=True, null=True)

class Meta:
db_table = 'log_entries'
indexes = [
models.Index(fields=['eventtime']),
models.Index(fields=['log_type', 'subtype']),
models.Index(fields=['srcip']),
models.Index(fields=['dstip']),
models.Index(fields=['user']),
models.Index(fields=['created_at']),
]

def __str__(self):
return f"{self.logid} - {self.log_type} - {self.level}"

class LogProcessingStatus(models.Model):
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
]

batch_id = models.CharField(max_length=100, unique=True)


status = models.CharField(max_length=20, choices=STATUS_CHOICES,
default='pending')
total_logs = models.IntegerField(default=0)
processed_logs = models.IntegerField(default=0)
failed_logs = models.IntegerField(default=0)
error_message = models.TextField(blank=True, null=True)
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(blank=True, null=True)
def __str__(self):
return f"{self.batch_id} - {self.status}"

# parsers.py
import re
import json
from datetime import datetime
from typing import Dict, Any, Optional
import logging

logger = logging.getLogger(__name__)

class LogParser:
"""Base log parser class"""

@staticmethod
def parse_log_line(log_line: str) -> Dict[str, Any]:
"""Parse a single log line into key-value pairs"""
parsed_data = {}

# Remove newlines and extra spaces


log_line = log_line.strip()

# Pattern to match key=value pairs, handling quoted values


pattern = r'(\w+)=(?:"([^"]*)"|(\S+))'
matches = re.findall(pattern, log_line)

for match in matches:


key = match[0]
value = match[1] if match[1] else match[2]

# Convert numeric values


if value.isdigit():
parsed_data[key] = int(value)
elif key == 'eventtime' and value.isdigit():
parsed_data[key] = int(value)
else:
parsed_data[key] = value

# Store original log line


parsed_data['raw_log'] = log_line

return parsed_data

@staticmethod
def convert_eventtime(eventtime: int) -> datetime:
"""Convert eventtime to datetime object"""
try:
# Convert nanoseconds to seconds
timestamp = eventtime / 1_000_000_000
return datetime.fromtimestamp(timestamp)
except (ValueError, TypeError):
return datetime.now()

class TrafficLogParser(LogParser):
"""Parser specifically for traffic logs"""
@classmethod
def parse(cls, log_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse traffic log specific fields"""
parsed = {}

# Map common fields


field_mapping = {
'eventtime': 'eventtime',
'tz': 'timezone',
'logid': 'logid',
'type': 'log_type',
'subtype': 'subtype',
'level': 'level',
'vd': 'vd',
'srcip': 'srcip',
'srcport': 'srcport',
'srcintf': 'srcintf',
'srcintfrole': 'srcintfrole',
'dstip': 'dstip',
'dstport': 'dstport',
'dstintf': 'dstintf',
'dstintfrole': 'dstintfrole',
'srccountry': 'srccountry',
'dstcountry': 'dstcountry',
'sessionid': 'sessionid',
'proto': 'proto',
'action': 'action',
'policyid': 'policyid',
'service': 'service',
'trandisp': 'trandisp',
'app': 'app',
'duration': 'duration',
'sentbyte': 'sentbyte',
'rcvdbyte': 'rcvdbyte',
'sentpkt': 'sentpkt',
'rcvdpkt': 'rcvdpkt',
'appcat': 'appcat',
}

for source_key, target_key in field_mapping.items():


if source_key in log_data:
parsed[target_key] = log_data[source_key]

return parsed

class VPNLogParser(LogParser):
"""Parser specifically for VPN logs"""

@classmethod
def parse(cls, log_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse VPN log specific fields"""
parsed = {}

# Map VPN specific fields


field_mapping = {
'eventtime': 'eventtime',
'tz': 'timezone',
'logid': 'logid',
'type': 'log_type',
'subtype': 'subtype',
'level': 'level',
'vd': 'vd',
'logdesc': 'logdesc',
'action': 'action',
'tunneltype': 'tunneltype',
'tunnelid': 'tunnelid',
'remip': 'remip',
'tunnelip': 'tunnelip',
'user': 'user',
'group': 'group',
'dst_host': 'dst_host',
'reason': 'reason',
'msg': 'msg',
}

for source_key, target_key in field_mapping.items():


if source_key in log_data:
parsed[target_key] = log_data[source_key]

return parsed

# graylog_client.py
import requests
import json
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class GraylogClient:
"""Client for interacting with Graylog API"""

def __init__(self, base_url: str, username: str, password: str):


self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.session = requests.Session()
self.session.auth = (username, password)
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})

def search_logs(self,
query: str = "*",
time_range: int = 300,
limit: int = 100,
offset: int = 0) -> Optional[Dict[str, Any]]:
"""Search logs from Graylog"""
try:
url = f"{self.base_url}/api/search/universal/relative"
params = {
'query': query,
'range': time_range,
'limit': limit,
'offset': offset,
'sort': 'timestamp:desc'
}

response = self.session.get(url, params=params)


response.raise_for_status()

return response.json()

except requests.exceptions.RequestException as e:
logger.error(f"Error fetching logs from Graylog: {e}")
return None

def get_recent_logs(self, minutes_back: int = 5) -> List[Dict[str, Any]]:


"""Get logs from the last N minutes"""
try:
result = self.search_logs(
query="*",
time_range=minutes_back * 60,
limit=1000
)

if result and 'messages' in result:


return [msg['message'] for msg in result['messages']]

return []

except Exception as e:
logger.error(f"Error getting recent logs: {e}")
return []

def stream_logs(self, callback_func, interval: int = 30):


"""Stream logs continuously"""
import time

last_timestamp = datetime.now()

while True:
try:
# Get logs since last check
logs = self.get_recent_logs(minutes_back=1)

if logs:
callback_func(logs)
last_timestamp = datetime.now()

time.sleep(interval)

except KeyboardInterrupt:
logger.info("Log streaming stopped by user")
break
except Exception as e:
logger.error(f"Error in log streaming: {e}")
time.sleep(interval)

# tasks.py
from celery import shared_task
from django.utils import timezone
from django.db import transaction
from .models import LogEntry, LogProcessingStatus
from .parsers import LogParser, TrafficLogParser, VPNLogParser
from .graylog_client import GraylogClient
from django.conf import settings
import uuid
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True)
def fetch_and_process_logs(self, minutes_back=5):
"""Fetch logs from Graylog and process them"""
batch_id = str(uuid.uuid4())

# Create processing status record


status = LogProcessingStatus.objects.create(
batch_id=batch_id,
status='processing'
)

try:
# Initialize Graylog client
client = GraylogClient(
base_url=settings.GRAYLOG_URL,
username=settings.GRAYLOG_USERNAME,
password=settings.GRAYLOG_PASSWORD
)

# Fetch recent logs


logs = client.get_recent_logs(minutes_back=minutes_back)

status.total_logs = len(logs)
status.save()

processed_count = 0
failed_count = 0

for log_data in logs:


try:
# Process individual log entry
success = process_single_log.delay(log_data, batch_id)
if success:
processed_count += 1
else:
failed_count += 1

except Exception as e:
logger.error(f"Error processing log: {e}")
failed_count += 1

# Update status
status.processed_logs = processed_count
status.failed_logs = failed_count
status.status = 'completed'
status.completed_at = timezone.now()
status.save()
return {
'batch_id': batch_id,
'total': len(logs),
'processed': processed_count,
'failed': failed_count
}

except Exception as e:
logger.error(f"Error in fetch_and_process_logs: {e}")
status.status = 'failed'
status.error_message = str(e)
status.completed_at = timezone.now()
status.save()
raise

@shared_task
def process_single_log(log_data, batch_id):
"""Process a single log entry"""
try:
# Parse the raw log line if it's a string
if isinstance(log_data, str):
parsed_data = LogParser.parse_log_line(log_data)
else:
parsed_data = log_data

# Determine log type and use appropriate parser


log_type = parsed_data.get('type', 'unknown')

if log_type == 'traffic':
processed_data = TrafficLogParser.parse(parsed_data)
elif log_type == 'event' and parsed_data.get('subtype') == 'vpn':
processed_data = VPNLogParser.parse(parsed_data)
else:
# Generic parsing
processed_data = parsed_data

# Create LogEntry
with transaction.atomic():
log_entry = LogEntry()

# Map fields to model


for field_name, value in processed_data.items():
if hasattr(log_entry, field_name):
setattr(log_entry, field_name, value)

# Set required fields with defaults


if not hasattr(log_entry, 'raw_log') or not log_entry.raw_log:
log_entry.raw_log = str(log_data)

log_entry.parsed_fields = parsed_data
log_entry.processed_at = timezone.now()

log_entry.save()

logger.info(f"Successfully processed log entry: {log_entry.logid}")


return True

except Exception as e:
logger.error(f"Error processing single log: {e}")
return False

@shared_task
def start_real_time_processing():
"""Start real-time log processing from Graylog"""
def process_logs_callback(logs):
"""Callback function for processing streamed logs"""
for log in logs:
process_single_log.delay(log, f"realtime_{timezone.now().isoformat()}")

try:
client = GraylogClient(
base_url=settings.GRAYLOG_URL,
username=settings.GRAYLOG_USERNAME,
password=settings.GRAYLOG_PASSWORD
)

# Start streaming logs


client.stream_logs(process_logs_callback, interval=30)

except Exception as e:
logger.error(f"Error in real-time processing: {e}")
raise

# management/commands/process_graylog.py
from django.core.management.base import BaseCommand
from django.conf import settings
from myapp.tasks import fetch_and_process_logs, start_real_time_processing

class Command(BaseCommand):
help = 'Process Graylog logs'

def add_arguments(self, parser):


parser.add_argument(
'--mode',
type=str,
choices=['batch', 'realtime'],
default='batch',
help='Processing mode: batch or realtime'
)
parser.add_argument(
'--minutes',
type=int,
default=5,
help='Minutes back to fetch logs (batch mode only)'
)

def handle(self, *args, **options):


mode = options['mode']

if mode == 'batch':
minutes_back = options['minutes']
self.stdout.write(f'Starting batch processing for last {minutes_back}
minutes...')
result = fetch_and_process_logs.delay(minutes_back)
self.stdout.write(f'Task queued: {result.id}')
elif mode == 'realtime':
self.stdout.write('Starting real-time processing...')
start_real_time_processing.delay()
self.stdout.write('Real-time processing started')

# settings.py additions
"""
Add these to your Django settings.py:

# Graylog Configuration
GRAYLOG_URL = 'http://your-graylog-server:9000'
GRAYLOG_USERNAME = 'your-username'
GRAYLOG_PASSWORD = 'your-password'

# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TIMEZONE = 'Asia/Ho_Chi_Minh'

# Logging Configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': 'graylog_processing.log',
},
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
},
},
'loggers': {
'myapp': {
'handlers': ['file', 'console'],
'level': 'INFO',
'propagate': True,
},
},
}
"""

# celery.py
"""
Create this file in your project root:

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
"""

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from .tasks import fetch_and_process_logs
from .models import LogProcessingStatus, LogEntry
import json

@csrf_exempt
@require_http_methods(["POST"])
def trigger_log_processing(request):
"""API endpoint to trigger log processing"""
try:
data = json.loads(request.body)
minutes_back = data.get('minutes_back', 5)

# Queue the processing task


task = fetch_and_process_logs.delay(minutes_back)

return JsonResponse({
'status': 'success',
'task_id': task.id,
'message': f'Log processing queued for last {minutes_back} minutes'
})

except Exception as e:
return JsonResponse({
'status': 'error',
'message': str(e)
}, status=500)

@require_http_methods(["GET"])
def processing_status(request, batch_id):
"""Get processing status for a batch"""
try:
status = LogProcessingStatus.objects.get(batch_id=batch_id)
return JsonResponse({
'batch_id': status.batch_id,
'status': status.status,
'total_logs': status.total_logs,
'processed_logs': status.processed_logs,
'failed_logs': status.failed_logs,
'started_at': status.started_at.isoformat(),
'completed_at': status.completed_at.isoformat() if status.completed_at
else None,
'error_message': status.error_message
})
except LogProcessingStatus.DoesNotExist:
return JsonResponse({'error': 'Batch not found'}, status=404)

@require_http_methods(["GET"])
def log_stats(request):
"""Get log statistics"""
from django.db.models import Count

stats = LogEntry.objects.aggregate(
total_logs=Count('id'),
traffic_logs=Count('id', filter=models.Q(log_type='traffic')),
event_logs=Count('id', filter=models.Q(log_type='event')),
)

recent_logs = LogEntry.objects.filter(
created_at__gte=timezone.now() - timedelta(hours=24)
).count()

stats['recent_24h'] = recent_logs

return JsonResponse(stats)

#fix view.py
from django.views.decorators.http import require_http_methods
from django.http import JsonResponse
from django.db import models
from django.utils import timezone
from datetime import timedelta

@require_http_methods(["GET"])
def log_stats(request):
"""Get log statistics"""
from django.db.models import Count

stats = LogEntry.objects.aggregate(
total_logs=Count('id'),
traffic_logs=Count('id', filter=models.Q(log_type='traffic')),
event_logs=Count('id', filter=models.Q(log_type='event')),
)

recent_logs = LogEntry.objects.filter(
created_at__gte=timezone.now() - timedelta(hours=24)
).count()

stats['recent_24h'] = recent_logs

return JsonResponse(stats)

# Usage Instructions:
"""
1. Install dependencies:
pip install celery redis requests psycopg2-binary

2. Run migrations:
python manage.py makemigrations
python manage.py migrate

3. Start Celery worker:


celery -A your_project worker -l info

4. Start Celery beat (for periodic tasks):


celery -A your_project beat -l info
5. Process logs:
# Batch processing
python manage.py process_graylog --mode batch --minutes 10

# Real-time processing
python manage.py process_graylog --mode realtime

6. API Usage:
# Trigger processing
curl -X POST http://localhost:8000/api/trigger-processing/ \
-H "Content-Type: application/json" \
-d '{"minutes_back": 10}'

# Check status
curl http://localhost:8000/api/status/{batch_id}/

# Get stats
curl http://localhost:8000/api/stats/
"""

bash# Batch processing last 10 minutes


python manage.py process_graylog --mode batch --minutes 10

# Start real-time processing


python manage.py process_graylog --mode realtime

# API trigger
curl -X POST /api/trigger-processing/ -d '{"minutes_back": 5}'

You might also like