-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcursor.py
136 lines (108 loc) · 4.39 KB
/
cursor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import datetime
import json
import time
from django.core.serializers.json import DjangoJSONEncoder
from django.utils.encoding import force_str
from bx_django_utils.stacktrace import get_stacktrace
try:
from psycopg2._json import Json as PostgresJson
except ImportError:
PostgresJson = None
class RecordingCursorWrapper:
"""
An implementation of django.db.backends.utils.CursorWrapper.
It exposes the same public interface, but logs executed queries
to the provided logger before delegating them to the wrapped cursor.
"""
def __init__(
self,
cursor,
db,
logger,
collect_stacktrace=None,
query_explain: bool = False, # Capture EXPLAIN SQL information?
):
self.cursor = cursor
self.db = db
self.logger = logger # must implement 'record' method
if collect_stacktrace is None:
self.get_stacktrace = get_stacktrace
else:
self.get_stacktrace = collect_stacktrace
self.query_explain = query_explain
def __getattr__(self, attr):
return getattr(self.cursor, attr)
def __iter__(self):
return iter(self.cursor)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def _quote_expr(self, element):
if isinstance(element, str):
return "'%s'" % element.replace("'", "''")
else:
return repr(element)
def _quote_params(self, params):
if not params:
return params
if isinstance(params, dict):
return {key: self._quote_expr(value) for key, value in params.items()}
return [self._quote_expr(p) for p in params]
def _decode(self, param):
if PostgresJson and isinstance(param, PostgresJson):
return param.dumps(param.adapted)
# decode each item separately for data containers
if isinstance(param, (tuple, list)):
return [self._decode(element) for element in param]
if isinstance(param, dict):
return {key: self._decode(value) for key, value in param.items()}
# make sure datetime, date and time are converted to string by force_str
CONVERT_TYPES = (datetime.datetime, datetime.date, datetime.time)
try:
return force_str(param, strings_only=not isinstance(param, CONVERT_TYPES))
except UnicodeDecodeError:
return repr(param)
def _record(self, method, sql, params):
if not self.query_explain:
explain = None
else:
explain_prefix = self.db.ops.explain_query_prefix()
self.cursor.execute(f'{explain_prefix} {sql}', params)
result = self.cursor.fetchall()
# Convert the result in the same way as Django, see: SQLCompiler.explain_query():
explain = []
for row in result:
if not isinstance(row, str):
explain.append(' '.join(str(c) for c in row))
else:
explain.append(row)
start = time.monotonic()
try:
return method(sql, params)
finally:
stop = time.monotonic()
duration = (stop - start) * 1000
try:
_params_decoded = json.dumps(self._decode(params), cls=DjangoJSONEncoder)
except TypeError:
_params_decoded = '' # object not JSON serializable, we have to live with that
sql = str(sql) # is sometimes an object, e.g. psycopg Composed, so ensure string
stacktrace = self.get_stacktrace()
self.logger.record(
alias=getattr(self.db, 'alias', 'default'),
vendor=getattr(self.db.connection, 'vendor', 'unknown'),
raw_sql=sql,
sql=self.db.ops.last_executed_query(self.cursor, sql, self._quote_params(params)),
raw_params=params,
params=_params_decoded,
duration=duration,
stacktrace=stacktrace,
explain=explain,
)
def callproc(self, procname, params=None):
return self._record(self.cursor.callproc, procname, params)
def execute(self, sql, params=None):
return self._record(self.cursor.execute, sql, params)
def executemany(self, sql, param_list):
return self._record(self.cursor.executemany, sql, param_list)