|
26 | 26 | from opentelemetry import trace |
27 | 27 | from opentelemetry._opamp import messages |
28 | 28 | from opentelemetry._opamp.agent import OpAMPAgent |
| 29 | +from opentelemetry._opamp.callbacks import Callbacks, MessageData |
29 | 30 | from opentelemetry._opamp.client import OpAMPClient |
30 | 31 | from opentelemetry._opamp.exceptions import ( |
31 | 32 | OpAMPRemoteConfigDecodeException, |
@@ -253,52 +254,46 @@ def _get_config(): |
253 | 254 | return _config |
254 | 255 |
|
255 | 256 |
|
256 | | -def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent): |
257 | | - # server wants us to report full state as it cannot recognize us as agent because |
258 | | - # e.g it may have been restarted and lost state. |
259 | | - if _report_full_state(message): |
260 | | - # here we're not returning explicitly but usually we don't get a remote config when we get the flag set |
261 | | - payload = client._build_full_state_message() |
262 | | - agent.send(payload=payload) |
263 | | - |
264 | | - # we check config_hash because we need to track last received config and remote_config seems to be always truthy |
265 | | - if not message.remote_config or not message.remote_config.config_hash: |
266 | | - return |
| 257 | +class EDOTOpAMPCallbacks(Callbacks): |
| 258 | + def on_message(self, agent: OpAMPAgent, client: OpAMPClient, message: MessageData): |
| 259 | + # we check config_hash because we need to track last received config and remote_config seems to be always truthy |
| 260 | + if not message.remote_config or not message.remote_config.config_hash: |
| 261 | + return |
267 | 262 |
|
268 | | - _config = _get_config() |
269 | | - error_messages = [] |
270 | | - try: |
271 | | - for config_filename, remote_config in messages._decode_remote_config(message.remote_config): |
272 | | - # we don't have standardized config values so limit to configs coming from our backend |
273 | | - if config_filename == "elastic": |
274 | | - logger.debug("Config %s: %s", config_filename, remote_config) |
275 | | - config_update = _handle_logging_level(remote_config) |
276 | | - if config_update.error_message: |
277 | | - error_messages.append(config_update.error_message) |
278 | | - |
279 | | - config_update = _handle_sampling_rate(remote_config) |
280 | | - if config_update.error_message: |
281 | | - error_messages.append(config_update.error_message) |
282 | | - |
283 | | - config_update = _handle_deactivate_instrumentations(remote_config) |
284 | | - if config_update.error_message: |
285 | | - error_messages.append(config_update.error_message) |
286 | | - except (OpAMPRemoteConfigParseException, OpAMPRemoteConfigDecodeException) as exc: |
287 | | - logger.error(str(exc)) |
288 | | - error_messages.append(str(exc)) |
289 | | - |
290 | | - error_message = "\n".join(error_messages) |
291 | | - status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED |
292 | | - updated_remote_config = client._update_remote_config_status( |
293 | | - remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message |
294 | | - ) |
295 | | - |
296 | | - # update the cached effective config with what we updated |
297 | | - if _config: |
298 | | - effective_config = {"elastic": _config.to_dict()} |
299 | | - client._update_effective_config(effective_config) |
| 263 | + _config = _get_config() |
| 264 | + error_messages = [] |
| 265 | + try: |
| 266 | + for config_filename, remote_config in messages.decode_remote_config(message.remote_config): |
| 267 | + # we don't have standardized config values so limit to configs coming from our backend |
| 268 | + if config_filename == "elastic": |
| 269 | + logger.debug("Config %s: %s", config_filename, remote_config) |
| 270 | + config_update = _handle_logging_level(remote_config) |
| 271 | + if config_update.error_message: |
| 272 | + error_messages.append(config_update.error_message) |
| 273 | + |
| 274 | + config_update = _handle_sampling_rate(remote_config) |
| 275 | + if config_update.error_message: |
| 276 | + error_messages.append(config_update.error_message) |
| 277 | + |
| 278 | + config_update = _handle_deactivate_instrumentations(remote_config) |
| 279 | + if config_update.error_message: |
| 280 | + error_messages.append(config_update.error_message) |
| 281 | + except (OpAMPRemoteConfigParseException, OpAMPRemoteConfigDecodeException) as exc: |
| 282 | + logger.error(str(exc)) |
| 283 | + error_messages.append(str(exc)) |
| 284 | + |
| 285 | + error_message = "\n".join(error_messages) |
| 286 | + status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED |
| 287 | + updated_remote_config = client.update_remote_config_status( |
| 288 | + remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message |
| 289 | + ) |
| 290 | + |
| 291 | + # update the cached effective config with what we updated |
| 292 | + if _config: |
| 293 | + effective_config = {"elastic": _config.to_dict()} |
| 294 | + client.update_effective_config(effective_config, content_type="application/json") |
300 | 295 |
|
301 | | - # if we changed the config send an ack to the server so we don't receive the same config at every heartbeat response |
302 | | - if updated_remote_config is not None: |
303 | | - payload = client._build_remote_config_status_response_message(updated_remote_config) |
304 | | - agent.send(payload=payload) |
| 296 | + # if we changed the config send an ack to the server so we don't receive the same config at every heartbeat response |
| 297 | + if updated_remote_config is not None: |
| 298 | + payload = client.build_remote_config_status_response_message(updated_remote_config) |
| 299 | + agent.send(payload=payload) |
0 commit comments