-
Notifications
You must be signed in to change notification settings - Fork 8.3k
Expand file tree
/
Copy pathSystemLog.h
More file actions
251 lines (202 loc) · 11.4 KB
/
SystemLog.h
File metadata and controls
251 lines (202 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#pragma once
#include "config.h"
#include <Interpreters/StorageID.h>
#include <Common/SystemLogBase.h>
#include <Common/Exception.h>
#include <Parsers/IAST.h>
#include <Parsers/IParserBase.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/CommonParsers.h>
#include <Interpreters/SystemLogFlushPolicy.h>
#include <boost/noncopyable.hpp>
#define LIST_OF_ALL_SYSTEM_LOGS(M) \
M(QueryLog, query_log, "Contains information about executed queries, for example, start time, duration of processing, error messages.") \
M(QueryThreadLog, query_thread_log, "Contains information about threads that execute queries, for example, thread name, thread start time, duration of query processing.") \
M(PartLog, part_log, "This table contains information about events that occurred with data parts in the MergeTree family tables, such as adding or merging data.") \
M(BackgroundSchedulePoolLog, background_schedule_pool_log, "Contains history of background schedule pool task executions.") \
M(TraceLog, trace_log, "Contains stack traces collected by the sampling query profiler.") \
M(CrashLog, crash_log, "Contains information about stack traces for fatal errors. The table does not exist in the database by default, it is created only when fatal errors occur.") \
M(TextLog, text_log, "Contains logging entries which are normally written to a log file or to stdout.") \
M(MetricLog, metric_log, "Contains history of metrics values from tables system.metrics and system.events, periodically flushed to disk.") \
M(TransposedMetricLog, transposed_metric_log,"Contains history of metrics values from tables system.metrics and system.events. Periodically flushed to disk. Transposed form of system.metric_log.") \
M(ErrorLog, error_log, "Contains history of error values from table system.errors, periodically flushed to disk.") \
M(FilesystemCacheLog, filesystem_cache_log, "Contains a history of all events occurred with filesystem cache for objects on a remote filesystem.") \
M(FilesystemReadPrefetchesLog, filesystem_read_prefetches_log, "Contains a history of all prefetches done during reading from MergeTables backed by a remote filesystem.") \
M(ObjectStorageQueueLog, s3queue_log, "Contains logging entries with the information files processes by S3Queue engine.") \
M(ObjectStorageQueueLog, azure_queue_log, "Contains logging entries with the information files processes by S3Queue engine.") \
M(AsynchronousMetricLog, asynchronous_metric_log, "Contains the historical values for system.asynchronous_metrics, once per time interval (one second by default).") \
M(OpenTelemetrySpanLog, opentelemetry_span_log, "Contains information about trace spans for executed queries.") \
M(QueryViewsLog, query_views_log, "Contains information about the dependent views executed when running a query, for example, the view type or the execution time.") \
M(ZooKeeperLog, zookeeper_log, "This table contains information about the parameters of the request to the ZooKeeper server and the response from it.") \
M(SessionLog, session_log, "Contains information about all successful and failed login and logout events.") \
M(TransactionsInfoLog, transactions_info_log, "Contains information about all transactions executed on a current server.") \
M(ProcessorsProfileLog, processors_profile_log, "Contains profiling information on processors level (building blocks for a pipeline for query execution.") \
M(AsynchronousInsertLog, asynchronous_insert_log, "Contains a history for all asynchronous inserts executed on current server.") \
M(BackupLog, backup_log, "Contains logging entries with the information about BACKUP and RESTORE operations.") \
M(BlobStorageLog, blob_storage_log, "Contains logging entries with information about various blob storage operations such as uploads and deletes.") \
M(QueryMetricLog, query_metric_log, "Contains history of memory and metric values from table system.events for individual queries, periodically flushed to disk.") \
M(DeadLetterQueue, dead_letter_queue, "Contains messages that came from a streaming engine (e.g. Kafka) and were parsed unsuccessfully.") \
M(ZooKeeperConnectionLog, zookeeper_connection_log, "Contains history of ZooKeeper connections.") \
M(AggregatedZooKeeperLog, aggregated_zookeeper_log, "Contains statistics (number of operations, latencies, errors) of ZooKeeper operations grouped by session_id, parent_path and operation. Periodically flushed to disk.") \
M(IcebergMetadataLog, iceberg_metadata_log, "Contains content of Iceberg metadata files.") \
M(DeltaMetadataLog, delta_lake_metadata_log, "Contains content of Delta metadata files.") \
#define LIST_OF_CLOUD_SYSTEM_LOGS(M) \
M(DistributedCacheLog, distributed_cache_log, "Contains the history of all interactions with distributed cache.") \
M(DistributedCacheServerLog, distributed_cache_server_log, "Contains the history of all interactions with distributed cache client.") \
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class StorageWithComment : public IAST
{
public:
ASTPtr storage;
ASTPtr comment;
String getID(char) const override { return "Storage with comment definition"; }
ASTPtr clone() const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method clone is not supported");
}
protected:
void formatImpl(WriteBuffer &, const FormatSettings &, FormatState &, FormatStateStacked) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method formatImpl is not supported");
}
};
class ParserStorageWithComment : public IParserBase
{
protected:
const char * getName() const override { return "storage definition with comment"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
ParserStorage storage_p{ParserStorage::TABLE_ENGINE};
ASTPtr storage;
if (!storage_p.parse(pos, storage, expected))
return false;
ParserKeyword s_comment(Keyword::COMMENT);
ParserStringLiteral string_literal_parser;
ASTPtr comment;
if (s_comment.ignore(pos, expected))
string_literal_parser.parse(pos, comment, expected);
auto storage_with_comment = make_intrusive<StorageWithComment>();
storage_with_comment->storage = std::move(storage);
storage_with_comment->comment = std::move(comment);
node = storage_with_comment;
return true;
}
};
/** Allow to store structured log in system table.
*
* Logging is asynchronous. Data is put into queue from where it will be read by separate thread.
* That thread inserts log into a table with no more than specified periodicity.
*/
/** Structure of log, template parameter.
* Structure could change on server version update.
* If on first write, existing table has different structure,
* then it get renamed (put aside) and new table is created.
*/
/* Example:
struct LogElement
{
/// default constructor must be available
/// fields
static std::string name();
static ColumnsDescription getColumnsDescription();
/// TODO: Remove this method, we can return aliases directly from getColumnsDescription().
static NamesAndAliases getNamesAndAliases();
void appendToBlock(MutableColumns & columns) const;
};
*/
/// NOLINTBEGIN(bugprone-macro-parentheses)
#define FORWARD_DECLARATION(log_type, member, descr) \
class log_type; \
LIST_OF_ALL_SYSTEM_LOGS(FORWARD_DECLARATION)
#if CLICKHOUSE_CLOUD
LIST_OF_CLOUD_SYSTEM_LOGS(FORWARD_DECLARATION)
#endif
#undef FORWARD_DECLARATION
/// NOLINTEND(bugprone-macro-parentheses)
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
class SystemLogs
{
public:
SystemLogs() = default;
SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config);
SystemLogs(const SystemLogs & other) = default;
void flush(const std::vector<std::pair<String, String>> & names);
void flushAndShutdown();
void shutdown();
void handleCrash();
#define DECLARE_PUBLIC_MEMBERS(log_type, member, descr) \
std::shared_ptr<log_type> member; \
LIST_OF_ALL_SYSTEM_LOGS(DECLARE_PUBLIC_MEMBERS)
#if CLICKHOUSE_CLOUD
LIST_OF_CLOUD_SYSTEM_LOGS(DECLARE_PUBLIC_MEMBERS)
#endif
#undef DECLARE_PUBLIC_MEMBERS
private:
std::vector<ISystemLog *> getAllLogs() const;
void flushImpl(const std::vector<std::pair<String, String>> & names, bool should_prepare_tables_anyway, bool ignore_errors);
};
struct SystemLogSettings
{
SystemLogQueueSettings queue_settings;
String engine;
bool symbolize_traces = false;
};
template <typename LogElement>
class SystemLog : public SystemLogBase<LogElement>, private boost::noncopyable, public WithContext
{
public:
using Self = SystemLog;
using Base = SystemLogBase<LogElement>;
using Element = LogElement;
/** Parameter: table name where to write log.
* If table is not exists, then it get created with specified engine.
* If it already exists, then its structure is checked to be compatible with structure of log record.
* If it is compatible, then existing table will be used.
* If not - then existing table will be renamed to same name but with suffix '_N' at end,
* where N - is a minimal number from 1, for that table with corresponding name doesn't exist yet;
* and new table get created - as if previous table was not exist.
*/
SystemLog(ContextPtr context_,
const SystemLogSettings & settings_,
std::shared_ptr<SystemLogQueue<LogElement>> queue_ = nullptr);
/** Append a record into log.
* Writing to table will be done asynchronously and in case of failure, record could be lost.
*/
void shutdown() override;
/** Creates new table if it does not exist.
* Renames old table if its structure is not suitable.
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
*/
void prepareTable() override;
const StorageID & getTableID() const { return table_id; }
ISystemLogFlushPolicy & getFlushPolicy() { return *flush_policy; }
void setManualFlushTargetIndex(ISystemLog::Index target_index) override
{
flush_policy->prepareManualFlush(target_index);
}
protected:
LoggerPtr log;
using Base::queue;
StoragePtr getStorage() const;
/// Some tables can override settings for internal queries
virtual void addSettingsForQuery(ContextMutablePtr & mutable_context, IAST::QueryKind query_kind) const;
private:
/* Saving thread data */
const StorageID table_id;
const String storage_def;
std::unique_ptr<ISystemLogFlushPolicy> flush_policy;
String create_query;
String old_create_query;
bool is_prepared = false;
void savingThreadFunction() override;
/// flushImpl can be executed only in saving_thread.
void flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end);
ASTPtr getCreateTableQuery();
};
}