diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index f659f96551..8c1eaaa10b 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,17 +15,20 @@
# limitations under the License.
import logging
+from itertools import chain
-from twisted.internet import defer
+from twisted.internet.defer import DeferredLock
-from synapse.api.constants import EventTypes, Membership
-from synapse.storage.prepare_database import get_statements
+from synapse.storage import PostgresEngine
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
@@ -32,14 +36,18 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
- "state_events",
+ "total_events",
+ "total_event_bytes",
),
"user": ("public_rooms", "private_rooms"),
}
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
-TEMP_TABLE = "_temp_populate_stats"
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
class StatsStore(StateDeltasStore):
@@ -51,294 +59,111 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
- self.register_background_update_handler(
- "populate_stats_createtables", self._populate_stats_createtables
- )
- self.register_background_update_handler(
- "populate_stats_process_rooms", self._populate_stats_process_rooms
- )
- self.register_background_update_handler(
- "populate_stats_cleanup", self._populate_stats_cleanup
- )
-
- @defer.inlineCallbacks
- def _populate_stats_createtables(self, progress, batch_size):
-
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_createtables")
- return 1
-
- # TODO dev only
- yield self.delete_all_stats()
-
- # Get all the rooms that we want to process.
- def _make_staging_area(txn):
- # Create the temporary tables
- stmts = get_statements(
- """
- -- We just recreate the table, we'll be reinserting the
- -- correct entries again later anyway.
- DROP TABLE IF EXISTS {temp}_rooms;
-
- CREATE TABLE IF NOT EXISTS {temp}_rooms(
- room_id TEXT NOT NULL,
- events BIGINT NOT NULL
- );
-
- CREATE INDEX {temp}_rooms_events
- ON {temp}_rooms(events);
- CREATE INDEX {temp}_rooms_id
- ON {temp}_rooms(room_id);
- """.format(
- temp=TEMP_TABLE
- ).splitlines()
- )
+ self.stats_delta_processing_lock = DeferredLock()
- for statement in stmts:
- txn.execute(statement)
-
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_position(position TEXT NOT NULL)"
- )
- txn.execute(sql)
-
- # Get rooms we want to process from the database, only adding
- # those that we haven't (i.e. those not in room_stats_earliest_token)
- sql = """
- INSERT INTO %s_rooms (room_id, events)
- SELECT c.room_id, count(*) FROM current_state_events AS c
- LEFT JOIN room_stats_earliest_token AS t USING (room_id)
- WHERE t.room_id IS NULL
- GROUP BY c.room_id
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
+ self.register_noop_background_update("populate_stats_createtables")
+ self.register_noop_background_update("populate_stats_process_rooms")
+ self.register_noop_background_update("populate_stats_cleanup")
- new_pos = yield self.get_max_stream_id_in_current_state_deltas()
- yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
- yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
- self.get_earliest_token_for_room_stats.invalidate_all()
+ def quantise_stats_time(self, ts):
+ """
+ Quantises a timestamp to be a multiple of the bucket size.
- yield self._end_background_update("populate_stats_createtables")
- return 1
+ Args:
+ ts (int): the timestamp to quantise, in milliseconds since the Unix
+ Epoch
- @defer.inlineCallbacks
- def _populate_stats_cleanup(self, progress, batch_size):
+ Returns:
+ int: a timestamp which
+ - is divisible by the bucket size;
+ - is no later than `ts`; and
+ - is the largest such timestamp.
"""
- Update the user directory stream position, then clean up the old tables.
+ return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
+ def get_stats_positions(self, for_initial_processor=False):
"""
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_cleanup")
- return 1
+ Returns the stats processor positions.
- position = yield self._simple_select_one_onecol(
- TEMP_TABLE + "_position", None, "position"
+ Args:
+ for_initial_processor (bool, optional): If true, returns the position
+ promised by the latest stats regeneration, rather than the current
+ incremental processor's position.
+ Otherwise (if false), return the incremental processor's position.
+
+ Returns (dict):
+ Dict containing :-
+ state_delta_stream_id: stream_id of last-processed state delta
+ total_events_min_stream_ordering: stream_ordering of latest-processed
+ backfilled event, in the context of total_events counting.
+ total_events_max_stream_ordering: stream_ordering of latest-processed
+ non-backfilled event, in the context of total_events counting.
+ """
+ return self._simple_select_one(
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ retcols=(
+ "state_delta_stream_id",
+ "total_events_min_stream_ordering",
+ "total_events_max_stream_ordering",
+ ),
+ desc="stats_incremental_position",
)
- yield self.update_stats_stream_pos(position)
-
- def _delete_staging_area(txn):
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
-
- yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
-
- yield self._end_background_update("populate_stats_cleanup")
- return 1
-
- @defer.inlineCallbacks
- def _populate_stats_process_rooms(self, progress, batch_size):
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_process_rooms")
- return 1
-
- # If we don't have progress filed, delete everything.
- if not progress:
- yield self.delete_all_stats()
-
- def _get_next_batch(txn):
- # Only fetch 250 rooms, so we don't fetch too many at once, even
- # if those 250 rooms have less than batch_size state events.
- sql = """
- SELECT room_id, events FROM %s_rooms
- ORDER BY events DESC
- LIMIT 250
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
- rooms_to_work_on = txn.fetchall()
-
- if not rooms_to_work_on:
- return None
-
- # Get how many are left to process, so we can give status on how
- # far we are in processing
- txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
- progress["remaining"] = txn.fetchone()[0]
-
- return rooms_to_work_on
+ def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+ """
+ See L{get_stats_positions}.
- rooms_to_work_on = yield self.runInteraction(
- "populate_stats_temp_read", _get_next_batch
+ Args:
+ txn (cursor): Database cursor
+ """
+ return self._simple_select_one_txn(
+ txn=txn,
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ retcols=(
+ "state_delta_stream_id",
+ "total_events_min_stream_ordering",
+ "total_events_max_stream_ordering",
+ ),
)
- # No more rooms -- complete the transaction.
- if not rooms_to_work_on:
- yield self._end_background_update("populate_stats_process_rooms")
- return 1
+ def update_stats_positions(self, positions, for_initial_processor=False):
+ """
+ Updates the stats processor positions.
- logger.info(
- "Processing the next %d rooms of %d remaining",
- len(rooms_to_work_on),
- progress["remaining"],
+ Args:
+ positions: See L{get_stats_positions}
+ for_initial_processor: See L{get_stats_positions}
+ """
+ if positions is None:
+ positions = {
+ "state_delta_stream_id": None,
+ "total_events_min_stream_ordering": None,
+ "total_events_max_stream_ordering": None,
+ }
+ return self._simple_update_one(
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ updatevalues=positions,
+ desc="update_stats_incremental_position",
)
- # Number of state events we've processed by going through each room
- processed_event_count = 0
-
- for room_id, event_count in rooms_to_work_on:
-
- current_state_ids = yield self.get_current_state_ids(room_id)
-
- join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
- history_visibility_id = current_state_ids.get(
- (EventTypes.RoomHistoryVisibility, "")
- )
- encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
- name_id = current_state_ids.get((EventTypes.Name, ""))
- topic_id = current_state_ids.get((EventTypes.Topic, ""))
- avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
- canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
-
- event_ids = [
- join_rules_id,
- history_visibility_id,
- encryption_id,
- name_id,
- topic_id,
- avatar_id,
- canonical_alias_id,
- ]
-
- state_events = yield self.get_events(
- [ev for ev in event_ids if ev is not None]
- )
-
- def _get_or_none(event_id, arg):
- event = state_events.get(event_id)
- if event:
- return event.content.get(arg)
- return None
-
- yield self.update_room_state(
- room_id,
- {
- "join_rules": _get_or_none(join_rules_id, "join_rule"),
- "history_visibility": _get_or_none(
- history_visibility_id, "history_visibility"
- ),
- "encryption": _get_or_none(encryption_id, "algorithm"),
- "name": _get_or_none(name_id, "name"),
- "topic": _get_or_none(topic_id, "topic"),
- "avatar": _get_or_none(avatar_id, "url"),
- "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
- },
- )
-
- now = self.hs.get_reactor().seconds()
-
- # quantise time to the nearest bucket
- now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
- def _fetch_data(txn):
-
- # Get the current token of the room
- current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
- current_state_events = len(current_state_ids)
-
- membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
-
- total_state_events = self._get_total_state_event_counts_txn(
- txn, room_id
- )
-
- self._update_stats_txn(
- txn,
- "room",
- room_id,
- now,
- {
- "bucket_size": self.stats_bucket_size,
- "current_state_events": current_state_events,
- "joined_members": membership_counts.get(Membership.JOIN, 0),
- "invited_members": membership_counts.get(Membership.INVITE, 0),
- "left_members": membership_counts.get(Membership.LEAVE, 0),
- "banned_members": membership_counts.get(Membership.BAN, 0),
- "state_events": total_state_events,
- },
- )
- self._simple_insert_txn(
- txn,
- "room_stats_earliest_token",
- {"room_id": room_id, "token": current_token},
- )
-
- # We've finished a room. Delete it from the table.
- self._simple_delete_one_txn(
- txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
- )
-
- yield self.runInteraction("update_room_stats", _fetch_data)
-
- # Update the remaining counter.
- progress["remaining"] -= 1
- yield self.runInteraction(
- "populate_stats",
- self._background_update_progress_txn,
- "populate_stats_process_rooms",
- progress,
- )
-
- processed_event_count += event_count
-
- if processed_event_count > batch_size:
- # Don't process any more rooms, we've hit our batch size.
- return processed_event_count
-
- return processed_event_count
-
- def delete_all_stats(self):
+ def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
"""
- Delete all statistics records.
+ See L{update_stats_positions}
"""
-
- def _delete_all_stats_txn(txn):
- txn.execute("DELETE FROM room_state")
- txn.execute("DELETE FROM room_stats")
- txn.execute("DELETE FROM room_stats_earliest_token")
- txn.execute("DELETE FROM user_stats")
-
- return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
-
- def get_stats_stream_pos(self):
- return self._simple_select_one_onecol(
- table="stats_stream_pos",
- keyvalues={},
- retcol="stream_id",
- desc="stats_stream_pos",
- )
-
- def update_stats_stream_pos(self, stream_id):
- return self._simple_update_one(
- table="stats_stream_pos",
- keyvalues={},
- updatevalues={"stream_id": stream_id},
- desc="update_stats_stream_pos",
+ if positions is None:
+ positions = {
+ "state_delta_stream_id": None,
+ "total_events_min_stream_ordering": None,
+ "total_events_max_stream_ordering": None,
+ }
+ return self._simple_update_one_txn(
+ txn,
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ updatevalues=positions,
)
def update_room_state(self, room_id, fields):
@@ -364,42 +189,14 @@ class StatsStore(StateDeltasStore):
fields[col] = None
return self._simple_upsert(
- table="room_state",
+ table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
- def get_deltas_for_room(self, room_id, start, size=100):
- """
- Get statistics deltas for a given room.
-
- Args:
- room_id (str)
- start (int): Pagination start. Number of entries, not timestamp.
- size (int): How many entries to return.
-
- Returns:
- Deferred[list[dict]], where the dict has the keys of
- ABSOLUTE_STATS_FIELDS["room"] and "ts".
- """
- return self._simple_select_list_paginate(
- "room_stats",
- {"room_id": room_id},
- "ts",
- start,
- size,
- retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
- order_direction="DESC",
- )
-
- def get_all_room_state(self):
- return self._simple_select_list(
- "room_state", None, retcols=("name", "topic", "canonical_alias")
- )
-
@cached()
- def get_earliest_token_for_room_stats(self, room_id):
+ def get_earliest_token_for_stats(self, stats_type, id):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
@@ -409,79 +206,410 @@ class StatsStore(StateDeltasStore):
Returns:
Deferred[int]
"""
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
return self._simple_select_one_onecol(
- "room_stats_earliest_token",
- {"room_id": room_id},
- retcol="token",
+ "%s_current" % (table,),
+ {id_col: id},
+ retcol="completed_delta_stream_id",
allow_none=True,
)
- def update_stats(self, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert(
- table=table,
- keyvalues={id_col: stats_id, "ts": ts},
- values=fields,
- desc="update_stats",
- )
+ def update_stats_delta(
+ self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+ ):
+ """
+ Updates the statistics for a subject, with a delta (difference/relative
+ change).
+
+ Args:
+ ts (int): timestamp of the change
+ stats_type (str): "room" or "user" – the kind of subject
+ stats_id (str): the subject's ID (room ID or user ID)
+ fields (dict[str, int]): Deltas of stats values.
+ complete_with_stream_id (int, optional):
+ If supplied, converts an incomplete row into a complete row,
+ with the supplied stream_id marked as the stream_id where the
+ row was completed.
+ """
- def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert_txn(
- txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+ return self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
)
- def update_stats_delta(self, ts, stats_type, stats_id, field, value):
- def _update_stats_delta(txn):
- table, id_col = TYPE_TO_ROOM[stats_type]
-
- sql = (
- "SELECT * FROM %s"
- " WHERE %s=? and ts=("
- " SELECT MAX(ts) FROM %s"
- " WHERE %s=?"
- ")"
- ) % (table, id_col, table, id_col)
- txn.execute(sql, (stats_id, stats_id))
- rows = self.cursor_to_dict(txn)
- if len(rows) == 0:
- # silently skip as we don't have anything to apply a delta to yet.
- # this tries to minimise any race between the initial sync and
- # subsequent deltas arriving.
- return
-
- current_ts = ts
- latest_ts = rows[0]["ts"]
- if current_ts < latest_ts:
- # This one is in the past, but we're just encountering it now.
- # Mark it as part of the current bucket.
- current_ts = latest_ts
- elif ts != latest_ts:
- # we have to copy our absolute counters over to the new entry.
- values = {
- key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
- }
- values[id_col] = stats_id
- values["ts"] = ts
- values["bucket_size"] = self.stats_bucket_size
-
- self._simple_insert_txn(txn, table=table, values=values)
-
- # actually update the new value
- if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
- self._simple_update_txn(
- txn,
- table=table,
- keyvalues={id_col: stats_id, "ts": current_ts},
- updatevalues={field: value},
+ def _update_stats_delta_txn(
+ self,
+ txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=None,
+ absolute_field_overrides=None,
+ ):
+ """
+ See L{update_stats_delta}
+ Additional Args:
+ absolute_field_overrides (dict[str, int]): Current stats values
+ (i.e. not deltas) of absolute fields.
+ Does not work with per-slice fields.
+ """
+
+ if absolute_field_overrides is None:
+ absolute_field_overrides = {}
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ quantised_ts = self.quantise_stats_time(int(ts))
+ end_ts = quantised_ts + self.stats_bucket_size
+
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
+ for field in chain(fields.keys(), absolute_field_overrides.keys()):
+ if field not in abs_field_names and field not in slice_field_names:
+ # guard against potential SQL injection dodginess
+ raise ValueError(
+ "%s is not a recognised field"
+ " for stats type %s" % (field, stats_type)
)
+
+ # only absolute stats fields are tracked in the `_current` stats tables,
+ # so those are the only ones that we process deltas for when
+ # we upsert against the `_current` table.
+
+ # This calculates the deltas (`field = field + ?` values)
+ # for absolute fields,
+ # * defaulting to 0 if not specified
+ # (required for the INSERT part of upserting to work)
+ # * omitting overrides specified in `absolute_field_overrides`
+ deltas_of_absolute_fields = {
+ key: fields.get(key, 0)
+ for key in abs_field_names
+ if key not in absolute_field_overrides
+ }
+
+ if complete_with_stream_id is not None:
+ absolute_field_overrides = absolute_field_overrides.copy()
+ absolute_field_overrides[
+ "completed_delta_stream_id"
+ ] = complete_with_stream_id
+
+ # first upsert the `_current` table
+ self._upsert_with_additive_relatives_txn(
+ txn=txn,
+ table=table + "_current",
+ keyvalues={id_col: stats_id},
+ absolutes=absolute_field_overrides,
+ additive_relatives=deltas_of_absolute_fields,
+ )
+
+ if self.has_completed_background_updates():
+ # TODO want to check specifically for stats regenerator, not all
+ # background updates…
+ # then upsert the `_historical` table.
+ # we don't support absolute_fields for per-slice fields as it makes
+ # no sense.
+ per_slice_additive_relatives = {
+ key: fields.get(key, 0) for key in slice_field_names
+ }
+ self._upsert_copy_from_table_with_additive_relatives_txn(
+ txn=txn,
+ into_table=table + "_historical",
+ keyvalues={id_col: stats_id},
+ extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
+ extra_dst_keyvalues={"end_ts": end_ts},
+ additive_relatives=per_slice_additive_relatives,
+ src_table=table + "_current",
+ copy_columns=abs_field_names,
+ additional_where=" AND completed_delta_stream_id IS NOT NULL",
+ )
+
+ def _upsert_with_additive_relatives_txn(
+ self, txn, table, keyvalues, absolutes, additive_relatives
+ ):
+ """Used to update values in the stats tables.
+
+ Args:
+ txn: Transaction
+ table (str): Table name
+ keyvalues (dict[str, any]): Row-identifying key values
+ absolutes (dict[str, any]): Absolute (set) fields
+ additive_relatives (dict[str, int]): Fields that will be added onto
+ if existing row present.
+ """
+ if self.database_engine.can_native_upsert:
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = []
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "key_columns": ", ".join(keyvalues),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
+
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, table)
+ retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
+ current_row = self._simple_select_one_txn(
+ txn, table, keyvalues, retcols, allow_none=True
+ )
+ if current_row is None:
+ merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+ self._simple_insert_txn(txn, table, merged_dict)
else:
- sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
- table,
- field,
- field,
- id_col,
+ for (key, val) in additive_relatives.items():
+ current_row[key] += val
+ current_row.update(absolutes)
+ self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+ def _upsert_copy_from_table_with_additive_relatives_txn(
+ self,
+ txn,
+ into_table,
+ keyvalues,
+ extra_dst_keyvalues,
+ extra_dst_insvalues,
+ additive_relatives,
+ src_table,
+ copy_columns,
+ additional_where="",
+ ):
+ """
+ Args:
+ txn: Transaction
+ into_table (str): The destination table to UPSERT the row into
+ keyvalues (dict[str, any]): Row-identifying key values
+ extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+ for `into_table`.
+ extra_dst_insvalues (dict[str, any]): Additional values to insert
+ on new row creation for `into_table`.
+ additive_relatives (dict[str, any]): Fields that will be added onto
+ if existing row present. (Must be disjoint from copy_columns.)
+ src_table (str): The source table to copy from
+ copy_columns (iterable[str]): The list of columns to copy
+ additional_where (str): Additional SQL for where (prefix with AND
+ if using).
+ """
+ if self.database_engine.can_native_upsert:
+ ins_columns = chain(
+ keyvalues,
+ copy_columns,
+ additive_relatives,
+ extra_dst_keyvalues,
+ extra_dst_insvalues,
+ )
+ sel_exprs = chain(
+ keyvalues,
+ copy_columns,
+ (
+ "?"
+ for _ in chain(
+ additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
+ )
+ ),
+ )
+ keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+ sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+ sets_ar = (
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
+ for f in additive_relatives
+ )
+
+ sql = """
+ INSERT INTO %(into_table)s (%(ins_columns)s)
+ SELECT %(sel_exprs)s
+ FROM %(src_table)s
+ WHERE %(keyvalues_where)s %(additional_where)s
+ ON CONFLICT (%(keyvalues)s)
+ DO UPDATE SET %(sets)s
+ """ % {
+ "into_table": into_table,
+ "ins_columns": ", ".join(ins_columns),
+ "sel_exprs": ", ".join(sel_exprs),
+ "keyvalues_where": " AND ".join(keyvalues_where),
+ "src_table": src_table,
+ "keyvalues": ", ".join(
+ chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+ ),
+ "sets": ", ".join(chain(sets_cc, sets_ar)),
+ "additional_where": additional_where,
+ }
+
+ qargs = list(
+ chain(
+ additive_relatives.values(),
+ extra_dst_keyvalues.values(),
+ extra_dst_insvalues.values(),
+ keyvalues.values(),
)
- txn.execute(sql, (value, stats_id, current_ts))
+ )
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, into_table)
+ src_row = self._simple_select_one_txn(
+ txn, src_table, keyvalues, copy_columns
+ )
+ dest_current_row = self._simple_select_one_txn(
+ txn,
+ into_table,
+ keyvalues,
+ retcols=list(chain(additive_relatives.keys(), copy_columns)),
+ allow_none=True,
+ )
+
+ if dest_current_row is None:
+ merged_dict = {
+ **keyvalues,
+ **extra_dst_keyvalues,
+ **extra_dst_insvalues,
+ **src_row,
+ **additive_relatives,
+ }
+ self._simple_insert_txn(txn, into_table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ src_row[key] = dest_current_row[key] + val
+ self._simple_update_txn(txn, into_table, keyvalues, src_row)
+
+ def incremental_update_room_total_events_and_bytes(self, in_positions):
+ """
+ Counts the number of events and total event bytes per-room and then adds
+ these to the respective total_events and total_event_bytes room counts.
+
+ Args:
+ in_positions (dict): Positions,
+ as retrieved from L{get_stats_positions}.
+
+ Returns (Deferred[tuple[dict, bool]]):
+ First element (dict):
+ The new positions. Note that this is for reference only –
+ the new positions WILL be committed by this function.
+ Second element (bool):
+ true iff there was a change to the positions, false otherwise
+ """
+
+ def incremental_update_total_events_and_bytes_txn(txn):
+ positions = in_positions.copy()
- return self.runInteraction("update_stats_delta", _update_stats_delta)
+ max_pos = self.get_room_max_stream_ordering()
+ min_pos = self.get_room_min_stream_ordering()
+ self.update_total_event_and_bytes_count_between_txn(
+ txn,
+ low_pos=positions["total_events_max_stream_ordering"],
+ high_pos=max_pos,
+ )
+
+ self.update_total_event_and_bytes_count_between_txn(
+ txn,
+ low_pos=min_pos,
+ high_pos=positions["total_events_min_stream_ordering"],
+ )
+
+ if (
+ positions["total_events_max_stream_ordering"] != max_pos
+ or positions["total_events_min_stream_ordering"] != min_pos
+ ):
+ positions["total_events_max_stream_ordering"] = max_pos
+ positions["total_events_min_stream_ordering"] = min_pos
+
+ self._update_stats_positions_txn(txn, positions)
+
+ return positions, True
+ else:
+ return positions, False
+
+ return self.runInteraction(
+ "stats_incremental_total_events_and_bytes",
+ incremental_update_total_events_and_bytes_txn,
+ )
+
+ def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
+ """
+ Updates the total_events and total_event_bytes counts for rooms,
+ in a range of stream_orderings.
+
+ Inclusivity of low_pos and high_pos is dependent upon their signs.
+ This makes it intuitive to use this function for both backfilled
+ and non-backfilled events.
+
+ Examples:
+ (low, high) → (kind)
+ (3, 7) → 3 <git … <= 7 (normal-filled; low already processed before)
+ (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+ (-7, 7) → -7 <= … <= 7 (both)
+
+ Args:
+ txn: Database transaction.
+ low_pos: Low stream ordering
+ high_pos: High stream ordering
+ """
+
+ if low_pos >= high_pos:
+ # nothing to do here.
+ return
+
+ now = self.hs.clock.time_msec()
+
+ # we choose comparators based on the signs
+ low_comparator = "<=" if low_pos < 0 else "<"
+ high_comparator = "<" if high_pos < 0 else "<="
+
+ if isinstance(self.database_engine, PostgresEngine):
+ new_bytes_expression = "OCTET_LENGTH(json)"
+ else:
+ new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+ sql = """
+ SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ FROM events INNER JOIN event_json USING (event_id)
+ WHERE ? %s stream_ordering AND stream_ordering %s ?
+ GROUP BY room_id
+ """ % (
+ low_comparator,
+ high_comparator,
+ new_bytes_expression,
+ )
+
+ txn.execute(sql, (low_pos, high_pos))
+
+ for room_id, new_events, new_bytes in txn.fetchall():
+ self._update_stats_delta_txn(
+ txn,
+ now,
+ "room",
+ room_id,
+ {"total_events": new_events, "total_event_bytes": new_bytes},
+ )
|