diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index c4474df975..4e06938849 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1230,7 +1230,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
)
- (age,) = txn.fetchone()
+ (received_ts,) = txn.fetchone()
+
+ age = self._clock.time_msec() - received_ts
return count, age
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 897fa06639..08c580b0dc 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1580,11 +1580,11 @@ class PersistEventsStore:
# invalidate the cache for the redacted event
txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
- self.db_pool.simple_insert_txn(
+ self.db_pool.simple_upsert_txn(
txn,
table="redactions",
+ keyvalues={"event_id": event.event_id},
values={
- "event_id": event.event_id,
"redacts": event.redacts,
"received_ts": self._clock.time_msec(),
},
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index c3f551d377..e3a544d9b2 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -320,7 +320,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
"""
Returns millisecond unixtime for start of UTC day.
"""
- now = time.gmtime()
+ now = time.gmtime(self._clock.time())
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
return today_start * 1000
@@ -352,7 +352,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
) udv
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
INNER JOIN users ON users.name=u.user_id
- WHERE last_seen > ? AND last_seen <= ?
+ WHERE ? <= last_seen AND last_seen < ?
AND udv.timestamp IS NULL AND users.is_guest=0
AND users.appservice_id IS NULL
GROUP BY u.user_id, u.device_id
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 7fb7780d0f..eb4841830d 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -215,6 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"event_relations",
"event_search",
"rejections",
+ "redactions",
):
logger.info("[purge] removing events from %s", table)
@@ -392,7 +393,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"room_memberships",
"room_stats_state",
"room_stats_current",
- "room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 9f0d64a325..6ddafe5434 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -25,6 +25,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchStore
+from synapse.storage.types import Cursor
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -1022,10 +1023,22 @@ class RoomWorkerStore(SQLBaseStore):
)
-class RoomBackgroundUpdateStore(SQLBaseStore):
+class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
+ POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
+ REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
+
+
+_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
+ "DROP TRIGGER populate_min_depth2_trigger ON room_depth",
+ "DROP FUNCTION populate_min_depth2()",
+ "ALTER TABLE room_depth DROP COLUMN min_depth",
+ "ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth",
+)
+
+class RoomBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
@@ -1037,15 +1050,25 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
)
self.db_pool.updates.register_background_update_handler(
- self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
+ _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
self._remove_tombstoned_rooms_from_directory,
)
self.db_pool.updates.register_background_update_handler(
- self.ADD_ROOMS_ROOM_VERSION_COLUMN,
+ _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
self._background_add_rooms_room_version_column,
)
+ # BG updates to change the type of room_depth.min_depth
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+ self._background_populate_room_depth_min_depth2,
+ )
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+ self._background_replace_room_depth_min_depth,
+ )
+
async def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
them into the room_retention table.
@@ -1164,7 +1187,9 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
new_last_room_id = room_id
self.db_pool.updates._background_update_progress_txn(
- txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id}
+ txn,
+ _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
+ {"room_id": new_last_room_id},
)
return False
@@ -1176,7 +1201,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
if end:
await self.db_pool.updates._end_background_update(
- self.ADD_ROOMS_ROOM_VERSION_COLUMN
+ _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN
)
return batch_size
@@ -1215,7 +1240,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
if not rooms:
await self.db_pool.updates._end_background_update(
- self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
+ _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
)
return 0
@@ -1224,7 +1249,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
await self.set_room_is_public(room_id, False)
await self.db_pool.updates._background_update_progress(
- self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
+ _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
)
return len(rooms)
@@ -1268,6 +1293,71 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
return max_ordering is None
+ async def _background_populate_room_depth_min_depth2(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Populate room_depth.min_depth2
+
+ This is to deal with the fact that min_depth was initially created as a
+ 32-bit integer field.
+ """
+
+ def process(txn: Cursor) -> int:
+ last_room = progress.get("last_room", "")
+ txn.execute(
+ """
+ UPDATE room_depth SET min_depth2=min_depth
+ WHERE room_id IN (
+ SELECT room_id FROM room_depth WHERE room_id > ?
+ ORDER BY room_id LIMIT ?
+ )
+ RETURNING room_id;
+ """,
+ (last_room, batch_size),
+ )
+ row_count = txn.rowcount
+ if row_count == 0:
+ return 0
+ last_room = max(row[0] for row in txn)
+ logger.info("populated room_depth up to %s", last_room)
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+ {"last_room": last_room},
+ )
+ return row_count
+
+ result = await self.db_pool.runInteraction(
+ "_background_populate_min_depth2", process
+ )
+
+ if result != 0:
+ return result
+
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2
+ )
+ return 0
+
+ async def _background_replace_room_depth_min_depth(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Drop the old 'min_depth' column and rename 'min_depth2' into its place."""
+
+ def process(txn: Cursor) -> None:
+ for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS:
+ logger.info("completing room_depth migration: %s", sql)
+ txn.execute(sql)
+
+ await self.db_pool.runInteraction("_background_replace_room_depth", process)
+
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+ )
+
+ return 0
+
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: DatabasePool, db_conn, hs):
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 2796354a1f..4d82c4c26d 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -703,13 +703,22 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=10000)
async def is_host_joined(self, room_id: str, host: str) -> bool:
+ return await self._check_host_room_membership(room_id, host, Membership.JOIN)
+
+ @cached(max_entries=10000)
+ async def is_host_invited(self, room_id: str, host: str) -> bool:
+ return await self._check_host_room_membership(room_id, host, Membership.INVITE)
+
+ async def _check_host_room_membership(
+ self, room_id: str, host: str, membership: str
+ ) -> bool:
if "%" in host or "_" in host:
raise Exception("Invalid host name")
sql = """
SELECT state_key FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (event_id)
- WHERE m.membership = 'join'
+ WHERE m.membership = ?
AND type = 'm.room.member'
AND c.room_id = ?
AND state_key LIKE ?
@@ -722,7 +731,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
like_clause = "%:" + host
rows = await self.db_pool.execute(
- "is_host_joined", None, sql, room_id, like_clause
+ "is_host_joined", None, sql, membership, room_id, like_clause
)
if not rows:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 82a1833509..59d67c255b 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import StoreError
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.state_deltas import StateDeltasStore
-from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
@@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = {
"user": ("joined_rooms",),
}
-# these fields are per-timeslice and so should be reset to 0 upon a new slice
-# You can draw these stats on a histogram.
-# Example: number of events sent locally during a time slice
-PER_SLICE_FIELDS = {
- "room": ("total_events", "total_event_bytes"),
- "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
-}
-
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
# these are the tables (& ID columns) which contain our actual subjects
@@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore):
self.server_name = hs.hostname
self.clock = self.hs.get_clock()
self.stats_enabled = hs.config.stats_enabled
- self.stats_bucket_size = hs.config.stats_bucket_size
self.stats_delta_processing_lock = DeferredLock()
@@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore):
self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
- def quantise_stats_time(self, ts):
- """
- Quantises a timestamp to be a multiple of the bucket size.
-
- Args:
- ts (int): the timestamp to quantise, in milliseconds since the Unix
- Epoch
-
- Returns:
- int: a timestamp which
- - is divisible by the bucket size;
- - is no later than `ts`; and
- - is the largest such timestamp.
- """
- return (ts // self.stats_bucket_size) * self.stats_bucket_size
-
async def _populate_stats_process_users(self, progress, batch_size):
"""
This is a background update which regenerates statistics for users.
@@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
- async def get_statistics_for_subject(
- self, stats_type: str, stats_id: str, start: str, size: int = 100
- ) -> List[dict]:
- """
- Get statistics for a given subject.
-
- Args:
- stats_type: The type of subject
- stats_id: The ID of the subject (e.g. room_id or user_id)
- start: Pagination start. Number of entries, not timestamp.
- size: How many entries to return.
-
- Returns:
- A list of dicts, where the dict has the keys of
- ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
- """
- return await self.db_pool.runInteraction(
- "get_statistics_for_subject",
- self._get_statistics_for_subject_txn,
- stats_type,
- stats_id,
- start,
- size,
- )
-
- def _get_statistics_for_subject_txn(
- self, txn, stats_type, stats_id, start, size=100
- ):
- """
- Transaction-bound version of L{get_statistics_for_subject}.
- """
-
- table, id_col = TYPE_TO_TABLE[stats_type]
- selected_columns = list(
- ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
- )
-
- slice_list = self.db_pool.simple_select_list_paginate_txn(
- txn,
- table + "_historical",
- "end_ts",
- start,
- size,
- retcols=selected_columns + ["bucket_size", "end_ts"],
- keyvalues={id_col: stats_id},
- order_direction="DESC",
- )
-
- return slice_list
-
@cached()
async def get_earliest_token_for_stats(
self, stats_type: str, id: str
@@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore):
table, id_col = TYPE_TO_TABLE[stats_type]
- quantised_ts = self.quantise_stats_time(int(ts))
- end_ts = quantised_ts + self.stats_bucket_size
-
# Lets be paranoid and check that all the given field names are known
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
- slice_field_names = PER_SLICE_FIELDS[stats_type]
for field in chain(fields.keys(), absolute_field_overrides.keys()):
- if field not in abs_field_names and field not in slice_field_names:
+ if field not in abs_field_names:
# guard against potential SQL injection dodginess
raise ValueError(
"%s is not a recognised field"
@@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore):
additive_relatives=deltas_of_absolute_fields,
)
- per_slice_additive_relatives = {
- key: fields.get(key, 0) for key in slice_field_names
- }
- self._upsert_copy_from_table_with_additive_relatives_txn(
- txn=txn,
- into_table=table + "_historical",
- keyvalues={id_col: stats_id},
- extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
- extra_dst_keyvalues={"end_ts": end_ts},
- additive_relatives=per_slice_additive_relatives,
- src_table=table + "_current",
- copy_columns=abs_field_names,
- )
-
def _upsert_with_additive_relatives_txn(
self, txn, table, keyvalues, absolutes, additive_relatives
):
@@ -528,7 +434,7 @@ class StatsStore(StateDeltasStore):
]
relative_updates = [
- "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+ "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
@@ -568,205 +474,13 @@ class StatsStore(StateDeltasStore):
self.db_pool.simple_insert_txn(txn, table, merged_dict)
else:
for (key, val) in additive_relatives.items():
- current_row[key] += val
+ if current_row[key] is None:
+ current_row[key] = val
+ else:
+ current_row[key] += val
current_row.update(absolutes)
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
- def _upsert_copy_from_table_with_additive_relatives_txn(
- self,
- txn,
- into_table,
- keyvalues,
- extra_dst_keyvalues,
- extra_dst_insvalues,
- additive_relatives,
- src_table,
- copy_columns,
- ):
- """Updates the historic stats table with latest updates.
-
- This involves copying "absolute" fields from the `_current` table, and
- adding relative fields to any existing values.
-
- Args:
- txn: Transaction
- into_table (str): The destination table to UPSERT the row into
- keyvalues (dict[str, any]): Row-identifying key values
- extra_dst_keyvalues (dict[str, any]): Additional keyvalues
- for `into_table`.
- extra_dst_insvalues (dict[str, any]): Additional values to insert
- on new row creation for `into_table`.
- additive_relatives (dict[str, any]): Fields that will be added onto
- if existing row present. (Must be disjoint from copy_columns.)
- src_table (str): The source table to copy from
- copy_columns (iterable[str]): The list of columns to copy
- """
- if self.database_engine.can_native_upsert:
- ins_columns = chain(
- keyvalues,
- copy_columns,
- additive_relatives,
- extra_dst_keyvalues,
- extra_dst_insvalues,
- )
- sel_exprs = chain(
- keyvalues,
- copy_columns,
- (
- "?"
- for _ in chain(
- additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
- )
- ),
- )
- keyvalues_where = ("%s = ?" % f for f in keyvalues)
-
- sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
- sets_ar = (
- "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
- for f in additive_relatives
- )
-
- sql = """
- INSERT INTO %(into_table)s (%(ins_columns)s)
- SELECT %(sel_exprs)s
- FROM %(src_table)s
- WHERE %(keyvalues_where)s
- ON CONFLICT (%(keyvalues)s)
- DO UPDATE SET %(sets)s
- """ % {
- "into_table": into_table,
- "ins_columns": ", ".join(ins_columns),
- "sel_exprs": ", ".join(sel_exprs),
- "keyvalues_where": " AND ".join(keyvalues_where),
- "src_table": src_table,
- "keyvalues": ", ".join(
- chain(keyvalues.keys(), extra_dst_keyvalues.keys())
- ),
- "sets": ", ".join(chain(sets_cc, sets_ar)),
- }
-
- qargs = list(
- chain(
- additive_relatives.values(),
- extra_dst_keyvalues.values(),
- extra_dst_insvalues.values(),
- keyvalues.values(),
- )
- )
- txn.execute(sql, qargs)
- else:
- self.database_engine.lock_table(txn, into_table)
- src_row = self.db_pool.simple_select_one_txn(
- txn, src_table, keyvalues, copy_columns
- )
- all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
- dest_current_row = self.db_pool.simple_select_one_txn(
- txn,
- into_table,
- keyvalues=all_dest_keyvalues,
- retcols=list(chain(additive_relatives.keys(), copy_columns)),
- allow_none=True,
- )
-
- if dest_current_row is None:
- merged_dict = {
- **keyvalues,
- **extra_dst_keyvalues,
- **extra_dst_insvalues,
- **src_row,
- **additive_relatives,
- }
- self.db_pool.simple_insert_txn(txn, into_table, merged_dict)
- else:
- for (key, val) in additive_relatives.items():
- src_row[key] = dest_current_row[key] + val
- self.db_pool.simple_update_txn(
- txn, into_table, all_dest_keyvalues, src_row
- )
-
- async def get_changes_room_total_events_and_bytes(
- self, min_pos: int, max_pos: int
- ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
- """Fetches the counts of events in the given range of stream IDs.
-
- Args:
- min_pos
- max_pos
-
- Returns:
- Mapping of room ID to field changes.
- """
-
- return await self.db_pool.runInteraction(
- "stats_incremental_total_events_and_bytes",
- self.get_changes_room_total_events_and_bytes_txn,
- min_pos,
- max_pos,
- )
-
- def get_changes_room_total_events_and_bytes_txn(
- self, txn, low_pos: int, high_pos: int
- ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
- """Gets the total_events and total_event_bytes counts for rooms and
- senders, in a range of stream_orderings (including backfilled events).
-
- Args:
- txn
- low_pos: Low stream ordering
- high_pos: High stream ordering
-
- Returns:
- The room and user deltas for total_events/total_event_bytes in the
- format of `stats_id` -> fields
- """
-
- if low_pos >= high_pos:
- # nothing to do here.
- return {}, {}
-
- if isinstance(self.database_engine, PostgresEngine):
- new_bytes_expression = "OCTET_LENGTH(json)"
- else:
- new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
-
- sql = """
- SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
- FROM events INNER JOIN event_json USING (event_id)
- WHERE (? < stream_ordering AND stream_ordering <= ?)
- OR (? <= stream_ordering AND stream_ordering <= ?)
- GROUP BY events.room_id
- """ % (
- new_bytes_expression,
- )
-
- txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
- room_deltas = {
- room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
- for room_id, new_events, new_bytes in txn
- }
-
- sql = """
- SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
- FROM events INNER JOIN event_json USING (event_id)
- WHERE (? < stream_ordering AND stream_ordering <= ?)
- OR (? <= stream_ordering AND stream_ordering <= ?)
- GROUP BY events.sender
- """ % (
- new_bytes_expression,
- )
-
- txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
- user_deltas = {
- user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
- for user_id, new_events, new_bytes in txn
- if self.hs.is_mine_id(user_id)
- }
-
- return room_deltas, user_deltas
-
async def _calculate_and_set_initial_state_for_room(
self, room_id: str
) -> Tuple[dict, dict, int]:
@@ -893,6 +607,7 @@ class StatsStore(StateDeltasStore):
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
+ "knocked_members": membership_counts.get(Membership.KNOCK, 0),
"local_users_in_room": len(local_users_in_room),
},
)
|