Skip to content

Commit 381974a

Browse files
committed
elasticotel: move to upstream opamp client
Delete our version of the client and updates the distro code for the upstream changes regarding naming and client callbacks. This keeps a bunch of e2e tests against our own server implementation.
1 parent 5a2f2cd commit 381974a

27 files changed

Lines changed: 256 additions & 4181 deletions

dev-requirements.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ opentelemetry-api==1.40.0
3939
# opentelemetry-exporter-otlp-proto-http
4040
# opentelemetry-instrumentation
4141
# opentelemetry-instrumentation-system-metrics
42+
# opentelemetry-opamp-client
4243
# opentelemetry-resourcedetector-gcp
4344
# opentelemetry-sdk
4445
# opentelemetry-semantic-conventions
@@ -59,6 +60,8 @@ opentelemetry-instrumentation==0.61b0
5960
# opentelemetry-instrumentation-system-metrics
6061
opentelemetry-instrumentation-system-metrics==0.61b0
6162
# via elastic-opentelemetry (pyproject.toml)
63+
opentelemetry-opamp-client==0.1b0
64+
# via elastic-opentelemetry (pyproject.toml)
6265
opentelemetry-proto==1.40.0
6366
# via
6467
# opentelemetry-exporter-otlp-proto-common
@@ -103,6 +106,7 @@ pluggy==1.6.0
103106
protobuf==6.33.5
104107
# via
105108
# googleapis-common-protos
109+
# opentelemetry-opamp-client
106110
# opentelemetry-proto
107111
# oteltest
108112
psutil==7.2.2
@@ -137,7 +141,9 @@ typing-extensions==4.15.0
137141
urllib3==2.6.3
138142
# via requests
139143
uuid-utils==0.14.1
140-
# via elastic-opentelemetry (pyproject.toml)
144+
# via
145+
# elastic-opentelemetry (pyproject.toml)
146+
# opentelemetry-opamp-client
141147
wheel==0.46.3
142148
# via pip-tools
143149
wrapt==1.17.3

opamp-gen-requirements.txt

Lines changed: 0 additions & 5 deletions
This file was deleted.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ classifiers = [
2222
"Programming Language :: Python :: 3.11",
2323
"Programming Language :: Python :: 3.12",
2424
"Programming Language :: Python :: 3.13",
25+
"Programming Language :: Python :: 3.14",
2526
"Typing :: Typed",
2627
]
2728

@@ -36,6 +37,7 @@ dependencies = [
3637
"opentelemetry-sdk == 1.40.0",
3738
"opentelemetry-sdk-extension-aws ~= 2.1.0",
3839
"opentelemetry-semantic-conventions == 0.61b0",
40+
"opentelemetry-opamp-client == 0.1b0",
3941
"packaging",
4042
"uuid-utils",
4143
]
@@ -108,10 +110,8 @@ pythonVersion = "3.9"
108110

109111
include = [
110112
"src/elasticotel",
111-
"src/opentelemetry",
112113
]
113114

114115
exclude = [
115116
"**/__pycache__",
116-
"src/opentelemetry/_opamp/proto",
117117
]

scripts/opamp_proto_codegen.sh

Lines changed: 0 additions & 81 deletions
This file was deleted.

src/elasticotel/distro/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
ELASTIC_OTEL_OPAMP_CLIENT_KEY,
6767
)
6868
from elasticotel.distro.resource_detectors import get_cloud_resource_detectors
69-
from elasticotel.distro.config import opamp_handler, _initialize_config, DEFAULT_SAMPLING_RATE
69+
from elasticotel.distro.config import EDOTOpAMPCallbacks, _initialize_config, DEFAULT_SAMPLING_RATE
7070

7171

7272
logger = logging.getLogger(__name__)
@@ -147,7 +147,7 @@ def _configure(self, **kwargs):
147147
)
148148
opamp_agent = OpAMPAgent(
149149
interval=30,
150-
message_handler=opamp_handler,
150+
callbacks=EDOTOpAMPCallbacks(),
151151
client=opamp_client,
152152
)
153153
opamp_agent.start()

src/elasticotel/distro/config.py

Lines changed: 42 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from opentelemetry import trace
2727
from opentelemetry._opamp import messages
2828
from opentelemetry._opamp.agent import OpAMPAgent
29+
from opentelemetry._opamp.callbacks import Callbacks, MessageData
2930
from opentelemetry._opamp.client import OpAMPClient
3031
from opentelemetry._opamp.exceptions import (
3132
OpAMPRemoteConfigDecodeException,
@@ -253,52 +254,46 @@ def _get_config():
253254
return _config
254255

255256

256-
def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent):
257-
# server wants us to report full state as it cannot recognize us as agent because
258-
# e.g it may have been restarted and lost state.
259-
if _report_full_state(message):
260-
# here we're not returning explicitly but usually we don't get a remote config when we get the flag set
261-
payload = client._build_full_state_message()
262-
agent.send(payload=payload)
263-
264-
# we check config_hash because we need to track last received config and remote_config seems to be always truthy
265-
if not message.remote_config or not message.remote_config.config_hash:
266-
return
257+
class EDOTOpAMPCallbacks(Callbacks):
258+
def on_message(self, agent: OpAMPAgent, client: OpAMPClient, message: MessageData):
259+
# we check config_hash because we need to track last received config and remote_config seems to be always truthy
260+
if not message.remote_config or not message.remote_config.config_hash:
261+
return
267262

268-
_config = _get_config()
269-
error_messages = []
270-
try:
271-
for config_filename, remote_config in messages._decode_remote_config(message.remote_config):
272-
# we don't have standardized config values so limit to configs coming from our backend
273-
if config_filename == "elastic":
274-
logger.debug("Config %s: %s", config_filename, remote_config)
275-
config_update = _handle_logging_level(remote_config)
276-
if config_update.error_message:
277-
error_messages.append(config_update.error_message)
278-
279-
config_update = _handle_sampling_rate(remote_config)
280-
if config_update.error_message:
281-
error_messages.append(config_update.error_message)
282-
283-
config_update = _handle_deactivate_instrumentations(remote_config)
284-
if config_update.error_message:
285-
error_messages.append(config_update.error_message)
286-
except (OpAMPRemoteConfigParseException, OpAMPRemoteConfigDecodeException) as exc:
287-
logger.error(str(exc))
288-
error_messages.append(str(exc))
289-
290-
error_message = "\n".join(error_messages)
291-
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED
292-
updated_remote_config = client._update_remote_config_status(
293-
remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message
294-
)
295-
296-
# update the cached effective config with what we updated
297-
if _config:
298-
effective_config = {"elastic": _config.to_dict()}
299-
client._update_effective_config(effective_config)
263+
_config = _get_config()
264+
error_messages = []
265+
try:
266+
for config_filename, remote_config in messages.decode_remote_config(message.remote_config):
267+
# we don't have standardized config values so limit to configs coming from our backend
268+
if config_filename == "elastic":
269+
logger.debug("Config %s: %s", config_filename, remote_config)
270+
config_update = _handle_logging_level(remote_config)
271+
if config_update.error_message:
272+
error_messages.append(config_update.error_message)
273+
274+
config_update = _handle_sampling_rate(remote_config)
275+
if config_update.error_message:
276+
error_messages.append(config_update.error_message)
277+
278+
config_update = _handle_deactivate_instrumentations(remote_config)
279+
if config_update.error_message:
280+
error_messages.append(config_update.error_message)
281+
except (OpAMPRemoteConfigParseException, OpAMPRemoteConfigDecodeException) as exc:
282+
logger.error(str(exc))
283+
error_messages.append(str(exc))
284+
285+
error_message = "\n".join(error_messages)
286+
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED
287+
updated_remote_config = client.update_remote_config_status(
288+
remote_config_hash=message.remote_config.config_hash, status=status, error_message=error_message
289+
)
290+
291+
# update the cached effective config with what we updated
292+
if _config:
293+
effective_config = {"elastic": _config.to_dict()}
294+
client.update_effective_config(effective_config, content_type="application/json")
300295

301-
# if we changed the config send an ack to the server so we don't receive the same config at every heartbeat response
302-
if updated_remote_config is not None:
303-
payload = client._build_remote_config_status_response_message(updated_remote_config)
304-
agent.send(payload=payload)
296+
# if we changed the config send an ack to the server so we don't receive the same config at every heartbeat response
297+
if updated_remote_config is not None:
298+
payload = client.build_remote_config_status_response_message(updated_remote_config)
299+
agent.send(payload=payload)

src/opentelemetry/_opamp/README.md

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/opentelemetry/_opamp/__init__.py

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)