Skip to content

Commit 3d71ad0

Browse files
yakkomajuriEDsCODE
andauthored
feat(persons-on-events): add required person and group columns to events table (#9251)
* refactor(ingestion): establish setup for json consumption from kafka into clickhouse [nuke protobuf pt. 1] * address review * fix kafka table name across the board * Update posthog/async_migrations/test/test_0004_replicated_schema.py * run checks * feat(persons-on-events): add required person and group columns to events table * rename * update snapshots * address review * Revert "update snapshots" This reverts commit 63d7126. * address review * update snapshots * update more snapshots * use runpython * update schemas * update more queries * some improvements :D * fix naming * fix breakdown prop name * update snapshot * fix naming * fix ambiguous test * fix queries' * last bits * fix typo to retrigger tests * also handle kafka and mv tables in migration * update snapshots * drop tables if exists Co-authored-by: eric <[email protected]>
1 parent 38bf3ef commit 3d71ad0

File tree

22 files changed

+164
-47
lines changed

22 files changed

+164
-47
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ For <100K events ingested monthly on Linux with Docker (recommended 4GB memory):
2727
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/posthog/posthog/HEAD/bin/deploy-hobby)"
2828
```
2929
### Option 2: Production instance on your infrastructure
30-
Follow our <a href="https://posthog.com/docs/self-host/overview#deploy">Scaleable Self-Hosting Guide</a> for all major cloud service providers and on-premise deploys
30+
Follow our <a href="https://posthog.com/docs/self-host/overview#deploy">Scalable Self-Hosting Guide</a> for all major cloud service providers and on-premise deploys
3131

3232
### Option 3: If you don't need to self-host
3333
Sign up for a free [PostHog Cloud](https://app.posthog.com/signup) project

ee/clickhouse/materialized_columns/columns.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from constance import config
66
from django.utils.timezone import now
77

8-
from ee.clickhouse.materialized_columns.replication import clickhouse_is_replicated
98
from ee.clickhouse.materialized_columns.util import cache_for
9+
from ee.clickhouse.replication.utils import clickhouse_is_replicated
1010
from ee.clickhouse.sql.clickhouse import trim_quotes_expr
1111
from posthog.client import sync_execute
1212
from posthog.models.property import PropertyName, TableWithProperties
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from infi.clickhouse_orm import migrations
2+
3+
from ee.clickhouse.replication.utils import clickhouse_is_replicated
4+
from ee.clickhouse.sql.events import EVENTS_TABLE_JSON_MV_SQL, KAFKA_EVENTS_TABLE_JSON_SQL
5+
from posthog.client import sync_execute
6+
from posthog.settings import CLICKHOUSE_CLUSTER
7+
8+
ADD_COLUMNS_BASE_SQL = """
9+
ALTER TABLE {table}
10+
ON CLUSTER '{cluster}'
11+
ADD COLUMN IF NOT EXISTS person_id UUID,
12+
ADD COLUMN IF NOT EXISTS person_properties VARCHAR,
13+
ADD COLUMN IF NOT EXISTS group0_properties VARCHAR,
14+
ADD COLUMN IF NOT EXISTS group1_properties VARCHAR,
15+
ADD COLUMN IF NOT EXISTS group2_properties VARCHAR,
16+
ADD COLUMN IF NOT EXISTS group3_properties VARCHAR,
17+
ADD COLUMN IF NOT EXISTS group4_properties VARCHAR
18+
"""
19+
20+
21+
def add_columns_to_required_tables(_):
22+
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="events", cluster=CLICKHOUSE_CLUSTER))
23+
24+
if clickhouse_is_replicated():
25+
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="writable_events", cluster=CLICKHOUSE_CLUSTER))
26+
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))
27+
28+
29+
operations = [
30+
migrations.RunPython(add_columns_to_required_tables),
31+
migrations.RunSQL(f"DROP TABLE IF EXISTS events_json_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
32+
migrations.RunSQL(f"DROP TABLE IF EXISTS kafka_events_json ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
33+
migrations.RunSQL(KAFKA_EVENTS_TABLE_JSON_SQL()),
34+
migrations.RunSQL(EVENTS_TABLE_JSON_MV_SQL()),
35+
]

ee/clickhouse/models/cohort.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def format_person_query(
5555

5656
for group_idx, group in enumerate(groups):
5757
if group.get("action_id") or group.get("event_id"):
58-
entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx)
58+
entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx, custom_match_field)
5959
params = {**params, **entity_params}
6060
filters.append(entity_query)
6161

@@ -128,7 +128,9 @@ def get_properties_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx
128128
return "\n".join(query_parts).replace("AND ", "", 1), params
129129

130130

131-
def get_entity_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx: int):
131+
def get_entity_cohort_subquery(
132+
cohort: Cohort, cohort_group: Dict, group_idx: int, custom_match_field: str = "person_id"
133+
):
132134
event_id = cohort_group.get("event_id")
133135
action_id = cohort_group.get("action_id")
134136
days = cohort_group.get("days")
@@ -157,7 +159,7 @@ def get_entity_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx: in
157159

158160
params: Dict[str, Union[str, int]] = {"count": int(count), **entity_params, **date_params}
159161

160-
return f"{'NOT' if is_negation else ''} person_id IN ({extract_person})", params
162+
return f"{'NOT' if is_negation else ''} {custom_match_field} IN ({extract_person})", params
161163
else:
162164
extract_person = GET_DISTINCT_ID_BY_ENTITY_SQL.format(entity_query=entity_query, date_query=date_query,)
163165
return f"distinct_id IN ({extract_person})", {**entity_params, **date_params}
@@ -240,7 +242,7 @@ def is_precalculated_query(cohort: Cohort) -> bool:
240242

241243

242244
def format_filter_query(cohort: Cohort, index: int = 0, id_column: str = "distinct_id") -> Tuple[str, Dict[str, Any]]:
243-
person_query, params = format_cohort_subquery(cohort, index)
245+
person_query, params = format_cohort_subquery(cohort, index, "person_id")
244246

245247
person_id_query = CALCULATE_COHORT_PEOPLE_SQL.format(
246248
query=person_query,

ee/clickhouse/queries/breakdown_props.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def get_breakdown_prop_values(
5050
prepend="e_brkdwn",
5151
person_properties_mode=PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN,
5252
allow_denormalized_props=True,
53+
person_id_joined_alias="pdi.person_id",
5354
)
5455

5556
entity_params, entity_format_params = get_entity_filtering_params(entity=entity, team_id=team_id, table_name="e")

ee/clickhouse/queries/related_actors_query.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020

2121
class RelatedActorsQuery:
22+
DISTINCT_ID_TABLE_ALIAS = "pdi"
23+
2224
"""
2325
This query calculates other groups and persons that are related to a person or a group.
2426
@@ -49,7 +51,7 @@ def _query_related_people(self) -> List[SerializedPerson]:
4951
person_ids = self._take_first(
5052
sync_execute(
5153
f"""
52-
SELECT DISTINCT person_id
54+
SELECT DISTINCT {self.DISTINCT_ID_TABLE_ALIAS}.person_id
5355
FROM events e
5456
{self._distinct_ids_join}
5557
WHERE team_id = %(team_id)s
@@ -102,11 +104,11 @@ def _filter_clause(self):
102104
if self.is_aggregating_by_groups:
103105
return f"$group_{self.group_type_index} = %(id)s"
104106
else:
105-
return "person_id = %(id)s"
107+
return f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id = %(id)s"
106108

107109
@property
108110
def _distinct_ids_join(self):
109-
return f"JOIN ({get_team_distinct_ids_query(self.team_id)}) pdi on e.distinct_id = pdi.distinct_id"
111+
return f"JOIN ({get_team_distinct_ids_query(self.team_id)}) {self.DISTINCT_ID_TABLE_ALIAS} on e.distinct_id = {self.DISTINCT_ID_TABLE_ALIAS}.distinct_id"
110112

111113
@cached_property
112114
def _params(self):

ee/clickhouse/queries/test/__snapshots__/test_lifecycle.ambr

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
4949
created_at
5050
FROM
51-
(SELECT DISTINCT person_id,
51+
(SELECT DISTINCT pdi.person_id as person_id,
5252
toDateTime(dateTrunc('day', events.timestamp)) AS period,
5353
person.created_at AS created_at
5454
FROM events AS e
@@ -131,7 +131,7 @@
131131
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
132132
created_at
133133
FROM
134-
(SELECT DISTINCT person_id,
134+
(SELECT DISTINCT pdi.person_id as person_id,
135135
toDateTime(dateTrunc('month', events.timestamp)) AS period,
136136
person.created_at AS created_at
137137
FROM events AS e
@@ -214,7 +214,7 @@
214214
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
215215
created_at
216216
FROM
217-
(SELECT DISTINCT person_id,
217+
(SELECT DISTINCT pdi.person_id as person_id,
218218
toDateTime(dateTrunc('week', events.timestamp)) AS period,
219219
person.created_at AS created_at
220220
FROM events AS e
@@ -297,7 +297,7 @@
297297
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
298298
created_at
299299
FROM
300-
(SELECT DISTINCT person_id,
300+
(SELECT DISTINCT pdi.person_id as person_id,
301301
toDateTime(dateTrunc('day', events.timestamp)) AS period,
302302
person.created_at AS created_at
303303
FROM events AS e
@@ -380,7 +380,7 @@
380380
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
381381
created_at
382382
FROM
383-
(SELECT DISTINCT person_id,
383+
(SELECT DISTINCT pdi.person_id as person_id,
384384
toDateTime(dateTrunc('day', events.timestamp)) AS period,
385385
person.created_at AS created_at
386386
FROM events AS e

ee/clickhouse/queries/trends/breakdown.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242

4343

4444
class ClickhouseTrendsBreakdown:
45+
DISTINCT_ID_TABLE_ALIAS = "pdi"
46+
4547
def __init__(
4648
self, entity: Entity, filter: Filter, team: Team, column_optimizer: Optional[EnterpriseColumnOptimizer] = None
4749
):
@@ -69,8 +71,11 @@ def get_query(self) -> Tuple[str, Dict, Callable]:
6971
property_group=outer_properties,
7072
table_name="e",
7173
person_properties_mode=PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN,
74+
person_id_joined_alias=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id",
75+
)
76+
aggregate_operation, _, math_params = process_math(
77+
self.entity, self.team, event_table_alias="e", person_id_alias=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id"
7278
)
73-
aggregate_operation, _, math_params = process_math(self.entity, self.team, event_table_alias="e")
7479

7580
action_query = ""
7681
action_params: Dict = {}
@@ -341,7 +346,7 @@ def _person_join_condition(self) -> Tuple[str, Dict]:
341346
f"""
342347
{event_join}
343348
INNER JOIN ({query}) person
344-
ON person.id = pdi.person_id
349+
ON person.id = {self.DISTINCT_ID_TABLE_ALIAS}.person_id
345350
""",
346351
params,
347352
)

ee/clickhouse/queries/trends/lifecycle.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def get_query(self):
105105
return (
106106
f"""
107107
SELECT DISTINCT
108-
person_id,
108+
{self.DISTINCT_ID_TABLE_ALIAS}.person_id as person_id,
109109
toDateTime(dateTrunc(%(interval)s, events.timestamp)) AS period,
110110
person.created_at AS created_at
111111
FROM events AS {self.EVENT_TABLE_ALIAS}

ee/clickhouse/queries/trends/total_volume.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ClickhouseTrendsTotalVolume:
2323
def _total_volume_query(self, entity: Entity, filter: Filter, team: Team) -> Tuple[str, Dict, Callable]:
2424
trunc_func = get_trunc_func_ch(filter.interval)
2525
interval_func = get_interval_func_ch(filter.interval)
26-
aggregate_operation, join_condition, math_params = process_math(entity, team)
26+
aggregate_operation, join_condition, math_params = process_math(entity, team, person_id_alias="person_id")
2727

2828
trend_event_query = TrendsEventQuery(
2929
filter=filter,

0 commit comments

Comments
 (0)