Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/sentry/models/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import contextlib
import datetime
import sys
import time
from enum import IntEnum
from typing import Any, Generator, Iterable, List, Mapping, Set, Type, TypeVar

import sentry_sdk
from django.db import connections, models, router, transaction
from django.db.models import Max
from django.dispatch import Signal
Expand All @@ -23,6 +25,7 @@
)
from sentry.silo import SiloMode
from sentry.utils import metrics
from sentry.utils.sdk import set_measurement

THE_PAST = datetime.datetime(2016, 8, 1, 0, 0, 0, 0, tzinfo=timezone.utc)

Expand Down Expand Up @@ -174,24 +177,26 @@ def process_coalesced(self) -> Generator[OutboxBase | None, None, None]:
if coalesced is not None:
first_coalesced: OutboxBase = self.select_coalesced_messages().first() or coalesced
_, deleted = self.select_coalesced_messages().filter(id__lte=coalesced.id).delete()
tags = {"category": OutboxCategory(self.category).name}
metrics.incr("outbox.processed", deleted, tags=tags)
metrics.timing(

set_measurement(
"outbox.processing_lag",
datetime.datetime.now().timestamp() - first_coalesced.scheduled_from.timestamp(),
tags=tags,
"second",
)

def process(self) -> bool:
with self.process_coalesced() as coalesced:
if coalesced is not None:
with metrics.timer(
"outbox.send_signal.duration",
tags={"category": OutboxCategory(coalesced.category).name},
):
coalesced.send_signal()
return True
return False
with sentry_sdk.start_transaction(op="outbox.process", name="outboxprocess") as transaction:
transaction.set_tag("category_name", OutboxCategory(self.category).name)
transaction.set_data("shard", self.key_from(self.coalesced_columns))
with self.process_coalesced() as coalesced:
if coalesced is not None:
start = time.monotonic()
try:
coalesced.send_signal()
finally:
set_measurement("send_signal_duration", time.monotonic() - start, "second")
return True
return False

@abc.abstractmethod
def send_signal(self):
Expand Down
60 changes: 59 additions & 1 deletion src/sentry/services/hybrid_cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import contextlib
import dataclasses
import functools
import inspect
import logging
import random
import threading
from abc import ABC, abstractmethod
from typing import (
Expand All @@ -15,13 +17,16 @@
Generic,
List,
Mapping,
MutableMapping,
Type,
TypeVar,
cast,
)

import sentry_sdk
from rest_framework.request import Request
from sentry_sdk import Hub
from sentry_sdk.tracing import Transaction

from sentry.utils.cursors import Cursor, CursorResult
from sentry.utils.pagination_factory import (
Expand Down Expand Up @@ -184,7 +189,60 @@ def silo_mode_delegation(
Simply creates a DelegatedBySiloMode from a mapping object, but casts it as a ServiceInterface matching
the mapping values.
"""
return cast(ServiceInterface, DelegatedBySiloMode(mapping))
new_mapping: MutableMapping[SiloMode, Callable[[], ServiceInterface]] = {}
for k, factory in mapping.items():
new_mapping[k] = _annotate_with_metrics(factory)
return cast(ServiceInterface, DelegatedBySiloMode(new_mapping))


def _factory_decorator(
decorate_service: Callable[[ServiceInterface], None]
) -> Callable[[Callable[[], ServiceInterface]], Callable[[], ServiceInterface]]:
"""
Creates a decorator for service factories that decorates each instance with the given decorate_service callable.
Useful for say, adding metrics to service methods.
"""

def decorator(factory: Callable[[], ServiceInterface]) -> Callable[[], ServiceInterface]:
def wrapper() -> ServiceInterface:
result: ServiceInterface = factory()
decorate_service(result)
return result

functools.update_wrapper(wrapper, factory)
return wrapper

return decorator


@_factory_decorator
def _annotate_with_metrics(service: ServiceInterface) -> None:
service_name = type(service).__name__
for Super in type(service).__bases__:
for attr in dir(Super):
base_val = getattr(Super, attr)
if getattr(base_val, "__isabstractmethod__", False):
setattr(
service, attr, _wrap_with_metrics(getattr(service, attr), service_name, attr)
)


def _wrap_with_metrics(
m: Callable[..., Any], service_class_name: str, method_name: str
) -> Callable[..., Any]:
def wrapper(*args: Any, **kwds: Any) -> Any:
with sentry_sdk.start_transaction(
name=f"hybrid_cloud.services.{service_class_name}.{method_name}",
op="execute",
sampled=random.random() < 0.1,
):
transaction: Transaction | None = Hub.current.scope.transaction
if transaction:
transaction.set_tag("silo_mode", SiloMode.get_current_mode().name)
return m(*args, **kwds)

functools.update_wrapper(wrapper, m)
return wrapper


@dataclasses.dataclass
Expand Down
116 changes: 57 additions & 59 deletions src/sentry/tasks/deletion/hybrid_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from sentry.models import TombstoneBase
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import json, metrics, redis
from sentry.utils import json, redis
from sentry.utils.sdk import set_measurement


def deletion_silo_modes() -> List[SiloMode]:
Expand Down Expand Up @@ -59,14 +60,8 @@ def set_watermark(
get_watermark_key(prefix, field),
json.dumps((value, sha1(prev_transaction_id.encode("utf8")).hexdigest())),
)
metrics.gauge(
"deletion.hybrid_cloud.low_bound",
value,
tags=dict(
field_name=f"{field.model._meta.db_table}.{field.name}",
watermark=prefix,
),
)

set_measurement("low_bound", value)


def chunk_watermark_batch(
Expand Down Expand Up @@ -176,59 +171,62 @@ def _process_tombstone_reconciliation(
watermark_manager: Manager = field.model.objects
watermark_target = "r"

low, up, has_more, tid = chunk_watermark_batch(
prefix, field, watermark_manager, batch_size=get_batch_size()
)
to_delete_ids: List[int] = []
if low < up:
oldest_seen: datetime.datetime = timezone.now()

with connections[router.db_for_read(model)].cursor() as conn:
conn.execute(
f"""
SELECT r.id, t.created_at FROM {model._meta.db_table} r JOIN {tombstone_cls._meta.db_table} t
ON t.table_name = %(table_name)s AND t.object_identifier = r.{field.name}
WHERE {watermark_target}.id > %(low)s AND {watermark_target}.id <= %(up)s
""",
{
"table_name": field.foreign_table_name,
"low": low,
"up": up,
},
)

for (row_id, tomb_created) in conn.fetchall():
to_delete_ids.append(row_id)
oldest_seen = min(oldest_seen, tomb_created)
with sentry_sdk.start_transaction(
name="deletion.hybrid_cloud.process_tombstone_reconciliation", op="process"
) as transaction:
transaction.set_tag("field_name", f"{model._meta.db_table}.{field.name}")
transaction.set_tag("watermark", prefix)

if field.on_delete == "CASCADE":
task = deletions.get(
model=model,
query={"id__in": to_delete_ids},
transaction_id=tid,
)

if task.chunk():
has_more = True # The current batch is not complete, rerun this task again
else:
low, up, has_more, tid = chunk_watermark_batch(
prefix, field, watermark_manager, batch_size=get_batch_size()
)
to_delete_ids: List[int] = []
if low < up:
oldest_seen: datetime.datetime = timezone.now()

with connections[router.db_for_read(model)].cursor() as conn:
conn.execute(
f"""
SELECT r.id, t.created_at FROM {model._meta.db_table} r JOIN {tombstone_cls._meta.db_table} t
ON t.table_name = %(table_name)s AND t.object_identifier = r.{field.name}
WHERE {watermark_target}.id > %(low)s AND {watermark_target}.id <= %(up)s
""",
{
"table_name": field.foreign_table_name,
"low": low,
"up": up,
},
)

for (row_id, tomb_created) in conn.fetchall():
to_delete_ids.append(row_id)
oldest_seen = min(oldest_seen, tomb_created)

if field.on_delete == "CASCADE":
task = deletions.get(
model=model,
query={"id__in": to_delete_ids},
transaction_id=tid,
)

if task.chunk():
has_more = True # The current batch is not complete, rerun this task again
else:
set_watermark(prefix, field, up, tid)

elif field.on_delete == "SET_NULL":
model.objects.filter(id__in=to_delete_ids).update(**{field.name: None})
set_watermark(prefix, field, up, tid)

elif field.on_delete == "SET_NULL":
model.objects.filter(id__in=to_delete_ids).update(**{field.name: None})
set_watermark(prefix, field, up, tid)

else:
raise ValueError(
f"{field.model.__name__}.{field.name} has unexpected on_delete={field.on_delete}, could not process delete!"
else:
raise ValueError(
f"{field.model.__name__}.{field.name} has unexpected on_delete={field.on_delete}, could not process delete!"
)

set_measurement(
"processing_lag",
datetime.datetime.now().timestamp() - oldest_seen.timestamp(),
"second",
)

metrics.timing(
"deletion.hybrid_cloud.processing_lag",
datetime.datetime.now().timestamp() - oldest_seen.timestamp(),
tags=dict(
field_name=f"{model._meta.db_table}.{field.name}",
watermark=prefix,
),
)

return has_more