diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..7b1d1b4203 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -15,14 +15,7 @@
import logging
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
-from synapse.metrics import event_processing_positions
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID
-from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -49,9 +42,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
- # Guard to ensure we only process deltas one at a time
- self._is_processing = False
-
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -62,272 +52,4 @@ class StatsHandler(StateDeltasHandler):
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
- if not self.hs.config.stats_enabled:
- return
-
- if self._is_processing:
- return
-
- @defer.inlineCallbacks
- def process():
- try:
- yield self._unsafe_process()
- finally:
- self._is_processing = False
-
- self._is_processing = True
- run_as_background_process("stats.notify_new_event", process)
-
- @defer.inlineCallbacks
- def _unsafe_process(self):
- # If self.pos is None then means we haven't fetched it from DB
- if self.pos is None:
- self.pos = yield self.store.get_stats_stream_pos()
-
- # If still None then the initial background update hasn't happened yet
- if self.pos is None:
- return None
-
- # Loop round handling deltas until we're up to date
- while True:
- with Measure(self.clock, "stats_delta"):
- deltas = yield self.store.get_current_state_deltas(self.pos)
- if not deltas:
- return
-
- logger.info("Handling %d state deltas", len(deltas))
- yield self._handle_deltas(deltas)
-
- self.pos = deltas[-1]["stream_id"]
- yield self.store.update_stats_stream_pos(self.pos)
-
- event_processing_positions.labels("stats").set(self.pos)
-
- @defer.inlineCallbacks
- def _handle_deltas(self, deltas):
- """
- Called with the state deltas to process
- """
- for delta in deltas:
- typ = delta["type"]
- state_key = delta["state_key"]
- room_id = delta["room_id"]
- event_id = delta["event_id"]
- stream_id = delta["stream_id"]
- prev_event_id = delta["prev_event_id"]
- stream_pos = delta["stream_id"]
-
- logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
-
- token = yield self.store.get_earliest_token_for_room_stats(room_id)
-
- # If the earliest token to begin from is larger than our current
- # stream ID, skip processing this delta.
- if token is not None and token >= stream_id:
- logger.debug(
- "Ignoring: %s as earlier than this room's initial ingestion event",
- event_id,
- )
- continue
-
- if event_id is None and prev_event_id is None:
- # Errr...
- continue
-
- event_content = {}
-
- if event_id is not None:
- event = yield self.store.get_event(event_id, allow_none=True)
- if event:
- event_content = event.content or {}
-
- # We use stream_pos here rather than fetch by event_id as event_id
- # may be None
- now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
-
- # quantise time to the nearest bucket
- now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
-
- if typ == EventTypes.Member:
- # we could use _get_key_change here but it's a bit inefficient
- # given we're not testing for a specific result; might as well
- # just grab the prev_membership and membership strings and
- # compare them.
- prev_event_content = {}
- if prev_event_id is not None:
- prev_event = yield self.store.get_event(
- prev_event_id, allow_none=True
- )
- if prev_event:
- prev_event_content = prev_event.content
-
- membership = event_content.get("membership", Membership.LEAVE)
- prev_membership = prev_event_content.get("membership", Membership.LEAVE)
-
- if prev_membership == membership:
- continue
-
- if prev_membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "joined_members", -1
- )
- elif prev_membership == Membership.INVITE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "invited_members", -1
- )
- elif prev_membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "left_members", -1
- )
- elif prev_membership == Membership.BAN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "banned_members", -1
- )
- else:
- err = "%s is not a valid prev_membership" % (repr(prev_membership),)
- logger.error(err)
- raise ValueError(err)
-
- if membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "joined_members", +1
- )
- elif membership == Membership.INVITE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "invited_members", +1
- )
- elif membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "left_members", +1
- )
- elif membership == Membership.BAN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "banned_members", +1
- )
- else:
- err = "%s is not a valid membership" % (repr(membership),)
- logger.error(err)
- raise ValueError(err)
-
- user_id = state_key
- if self.is_mine_id(user_id):
- # update user_stats as it's one of our users
- public = yield self._is_public_room(room_id)
-
- if membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now,
- "user",
- user_id,
- "public_rooms" if public else "private_rooms",
- -1,
- )
- elif membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now,
- "user",
- user_id,
- "public_rooms" if public else "private_rooms",
- +1,
- )
-
- elif typ == EventTypes.Create:
- # Newly created room. Add it with all blank portions.
- yield self.store.update_room_state(
- room_id,
- {
- "join_rules": None,
- "history_visibility": None,
- "encryption": None,
- "name": None,
- "topic": None,
- "avatar": None,
- "canonical_alias": None,
- },
- )
-
- elif typ == EventTypes.JoinRules:
- yield self.store.update_room_state(
- room_id, {"join_rules": event_content.get("join_rule")}
- )
-
- is_public = yield self._get_key_change(
- prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
- )
- if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
-
- elif typ == EventTypes.RoomHistoryVisibility:
- yield self.store.update_room_state(
- room_id,
- {"history_visibility": event_content.get("history_visibility")},
- )
-
- is_public = yield self._get_key_change(
- prev_event_id, event_id, "history_visibility", "world_readable"
- )
- if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
-
- elif typ == EventTypes.Encryption:
- yield self.store.update_room_state(
- room_id, {"encryption": event_content.get("algorithm")}
- )
- elif typ == EventTypes.Name:
- yield self.store.update_room_state(
- room_id, {"name": event_content.get("name")}
- )
- elif typ == EventTypes.Topic:
- yield self.store.update_room_state(
- room_id, {"topic": event_content.get("topic")}
- )
- elif typ == EventTypes.RoomAvatar:
- yield self.store.update_room_state(
- room_id, {"avatar": event_content.get("url")}
- )
- elif typ == EventTypes.CanonicalAlias:
- yield self.store.update_room_state(
- room_id, {"canonical_alias": event_content.get("alias")}
- )
-
- @defer.inlineCallbacks
- def update_public_room_stats(self, ts, room_id, is_public):
- """
- Increment/decrement a user's number of public rooms when a room they are
- in changes to/from public visibility.
-
- Args:
- ts (int): Timestamp in seconds
- room_id (str)
- is_public (bool)
- """
- # For now, blindly iterate over all local users in the room so that
- # we can handle the whole problem of copying buckets over as needed
- user_ids = yield self.store.get_users_in_room(room_id)
-
- for user_id in user_ids:
- if self.hs.is_mine(UserID.from_string(user_id)):
- yield self.store.update_stats_delta(
- ts, "user", user_id, "public_rooms", +1 if is_public else -1
- )
- yield self.store.update_stats_delta(
- ts, "user", user_id, "private_rooms", -1 if is_public else +1
- )
-
- @defer.inlineCallbacks
- def _is_public_room(self, room_id):
- join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
- history_visibility = yield self.state.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility
- )
-
- if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
- (
- history_visibility
- and history_visibility.content.get("history_visibility")
- == "world_readable"
- )
- ):
- return True
- else:
- return False
+ pass
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
new file mode 100644
index 0000000000..5b125d17b0
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -0,0 +1,33 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+ 'populate_stats_createtables',
+ 'populate_stats_process_rooms',
+ 'populate_stats_cleanup'
+);
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..38a7fa71d9 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -15,12 +15,7 @@
import logging
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, Membership
-from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
-from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
@@ -39,8 +34,6 @@ ABSOLUTE_STATS_FIELDS = {
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
-TEMP_TABLE = "_temp_populate_stats"
-
class StatsStore(StateDeltasStore):
def __init__(self, db_conn, hs):
@@ -51,291 +44,14 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
- self.register_background_update_handler(
- "populate_stats_createtables", self._populate_stats_createtables
- )
- self.register_background_update_handler(
- "populate_stats_process_rooms", self._populate_stats_process_rooms
+ self.register_noop_background_update(
+ "populate_stats_createtables"
)
- self.register_background_update_handler(
- "populate_stats_cleanup", self._populate_stats_cleanup
- )
-
- @defer.inlineCallbacks
- def _populate_stats_createtables(self, progress, batch_size):
-
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_createtables")
- return 1
-
- # Get all the rooms that we want to process.
- def _make_staging_area(txn):
- # Create the temporary tables
- stmts = get_statements(
- """
- -- We just recreate the table, we'll be reinserting the
- -- correct entries again later anyway.
- DROP TABLE IF EXISTS {temp}_rooms;
-
- CREATE TABLE IF NOT EXISTS {temp}_rooms(
- room_id TEXT NOT NULL,
- events BIGINT NOT NULL
- );
-
- CREATE INDEX {temp}_rooms_events
- ON {temp}_rooms(events);
- CREATE INDEX {temp}_rooms_id
- ON {temp}_rooms(room_id);
- """.format(
- temp=TEMP_TABLE
- ).splitlines()
- )
-
- for statement in stmts:
- txn.execute(statement)
-
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_position(position TEXT NOT NULL)"
- )
- txn.execute(sql)
-
- # Get rooms we want to process from the database, only adding
- # those that we haven't (i.e. those not in room_stats_earliest_token)
- sql = """
- INSERT INTO %s_rooms (room_id, events)
- SELECT c.room_id, count(*) FROM current_state_events AS c
- LEFT JOIN room_stats_earliest_token AS t USING (room_id)
- WHERE t.room_id IS NULL
- GROUP BY c.room_id
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
-
- new_pos = yield self.get_max_stream_id_in_current_state_deltas()
- yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
- yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
- self.get_earliest_token_for_room_stats.invalidate_all()
-
- yield self._end_background_update("populate_stats_createtables")
- return 1
-
- @defer.inlineCallbacks
- def _populate_stats_cleanup(self, progress, batch_size):
- """
- Update the user directory stream position, then clean up the old tables.
- """
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_cleanup")
- return 1
-
- position = yield self._simple_select_one_onecol(
- TEMP_TABLE + "_position", None, "position"
- )
- yield self.update_stats_stream_pos(position)
-
- def _delete_staging_area(txn):
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
-
- yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
-
- yield self._end_background_update("populate_stats_cleanup")
- return 1
-
- @defer.inlineCallbacks
- def _populate_stats_process_rooms(self, progress, batch_size):
-
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_process_rooms")
- return 1
-
- # If we don't have progress filed, delete everything.
- if not progress:
- yield self.delete_all_stats()
-
- def _get_next_batch(txn):
- # Only fetch 250 rooms, so we don't fetch too many at once, even
- # if those 250 rooms have less than batch_size state events.
- sql = """
- SELECT room_id, events FROM %s_rooms
- ORDER BY events DESC
- LIMIT 250
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
- rooms_to_work_on = txn.fetchall()
-
- if not rooms_to_work_on:
- return None
-
- # Get how many are left to process, so we can give status on how
- # far we are in processing
- txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
- progress["remaining"] = txn.fetchone()[0]
-
- return rooms_to_work_on
-
- rooms_to_work_on = yield self.runInteraction(
- "populate_stats_temp_read", _get_next_batch
- )
-
- # No more rooms -- complete the transaction.
- if not rooms_to_work_on:
- yield self._end_background_update("populate_stats_process_rooms")
- return 1
-
- logger.info(
- "Processing the next %d rooms of %d remaining",
- len(rooms_to_work_on),
- progress["remaining"],
- )
-
- # Number of state events we've processed by going through each room
- processed_event_count = 0
-
- for room_id, event_count in rooms_to_work_on:
-
- current_state_ids = yield self.get_current_state_ids(room_id)
-
- join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
- history_visibility_id = current_state_ids.get(
- (EventTypes.RoomHistoryVisibility, "")
- )
- encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
- name_id = current_state_ids.get((EventTypes.Name, ""))
- topic_id = current_state_ids.get((EventTypes.Topic, ""))
- avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
- canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
-
- event_ids = [
- join_rules_id,
- history_visibility_id,
- encryption_id,
- name_id,
- topic_id,
- avatar_id,
- canonical_alias_id,
- ]
-
- state_events = yield self.get_events(
- [ev for ev in event_ids if ev is not None]
- )
-
- def _get_or_none(event_id, arg):
- event = state_events.get(event_id)
- if event:
- return event.content.get(arg)
- return None
-
- yield self.update_room_state(
- room_id,
- {
- "join_rules": _get_or_none(join_rules_id, "join_rule"),
- "history_visibility": _get_or_none(
- history_visibility_id, "history_visibility"
- ),
- "encryption": _get_or_none(encryption_id, "algorithm"),
- "name": _get_or_none(name_id, "name"),
- "topic": _get_or_none(topic_id, "topic"),
- "avatar": _get_or_none(avatar_id, "url"),
- "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
- },
- )
-
- now = self.hs.get_reactor().seconds()
-
- # quantise time to the nearest bucket
- now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
- def _fetch_data(txn):
-
- # Get the current token of the room
- current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
- current_state_events = len(current_state_ids)
-
- membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
-
- total_state_events = self._get_total_state_event_counts_txn(
- txn, room_id
- )
-
- self._update_stats_txn(
- txn,
- "room",
- room_id,
- now,
- {
- "bucket_size": self.stats_bucket_size,
- "current_state_events": current_state_events,
- "joined_members": membership_counts.get(Membership.JOIN, 0),
- "invited_members": membership_counts.get(Membership.INVITE, 0),
- "left_members": membership_counts.get(Membership.LEAVE, 0),
- "banned_members": membership_counts.get(Membership.BAN, 0),
- "state_events": total_state_events,
- },
- )
- self._simple_insert_txn(
- txn,
- "room_stats_earliest_token",
- {"room_id": room_id, "token": current_token},
- )
-
- # We've finished a room. Delete it from the table.
- self._simple_delete_one_txn(
- txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
- )
-
- yield self.runInteraction("update_room_stats", _fetch_data)
-
- # Update the remaining counter.
- progress["remaining"] -= 1
- yield self.runInteraction(
- "populate_stats",
- self._background_update_progress_txn,
- "populate_stats_process_rooms",
- progress,
- )
-
- processed_event_count += event_count
-
- if processed_event_count > batch_size:
- # Don't process any more rooms, we've hit our batch size.
- return processed_event_count
-
- return processed_event_count
-
- def delete_all_stats(self):
- """
- Delete all statistics records.
- """
-
- def _delete_all_stats_txn(txn):
- txn.execute("DELETE FROM room_state")
- txn.execute("DELETE FROM room_stats")
- txn.execute("DELETE FROM room_stats_earliest_token")
- txn.execute("DELETE FROM user_stats")
-
- return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
-
- def get_stats_stream_pos(self):
- return self._simple_select_one_onecol(
- table="stats_stream_pos",
- keyvalues={},
- retcol="stream_id",
- desc="stats_stream_pos",
+ self.register_noop_background_update(
+ "populate_stats_process_rooms"
)
-
- def update_stats_stream_pos(self, stream_id):
- return self._simple_update_one(
- table="stats_stream_pos",
- keyvalues={},
- updatevalues={"stream_id": stream_id},
- desc="update_stats_stream_pos",
+ self.register_noop_background_update(
+ "populate_stats_cleanup"
)
def update_room_state(self, room_id, fields):
@@ -366,119 +82,3 @@ class StatsStore(StateDeltasStore):
values=fields,
desc="update_room_state",
)
-
- def get_deltas_for_room(self, room_id, start, size=100):
- """
- Get statistics deltas for a given room.
-
- Args:
- room_id (str)
- start (int): Pagination start. Number of entries, not timestamp.
- size (int): How many entries to return.
-
- Returns:
- Deferred[list[dict]], where the dict has the keys of
- ABSOLUTE_STATS_FIELDS["room"] and "ts".
- """
- return self._simple_select_list_paginate(
- "room_stats",
- {"room_id": room_id},
- "ts",
- start,
- size,
- retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
- order_direction="DESC",
- )
-
- def get_all_room_state(self):
- return self._simple_select_list(
- "room_state", None, retcols=("name", "topic", "canonical_alias")
- )
-
- @cached()
- def get_earliest_token_for_room_stats(self, room_id):
- """
- Fetch the "earliest token". This is used by the room stats delta
- processor to ignore deltas that have been processed between the
- start of the background task and any particular room's stats
- being calculated.
-
- Returns:
- Deferred[int]
- """
- return self._simple_select_one_onecol(
- "room_stats_earliest_token",
- {"room_id": room_id},
- retcol="token",
- allow_none=True,
- )
-
- def update_stats(self, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert(
- table=table,
- keyvalues={id_col: stats_id, "ts": ts},
- values=fields,
- desc="update_stats",
- )
-
- def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert_txn(
- txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
- )
-
- def update_stats_delta(self, ts, stats_type, stats_id, field, value):
- def _update_stats_delta(txn):
- table, id_col = TYPE_TO_ROOM[stats_type]
-
- sql = (
- "SELECT * FROM %s"
- " WHERE %s=? and ts=("
- " SELECT MAX(ts) FROM %s"
- " WHERE %s=?"
- ")"
- ) % (table, id_col, table, id_col)
- txn.execute(sql, (stats_id, stats_id))
- rows = self.cursor_to_dict(txn)
- if len(rows) == 0:
- # silently skip as we don't have anything to apply a delta to yet.
- # this tries to minimise any race between the initial sync and
- # subsequent deltas arriving.
- return
-
- current_ts = ts
- latest_ts = rows[0]["ts"]
- if current_ts < latest_ts:
- # This one is in the past, but we're just encountering it now.
- # Mark it as part of the current bucket.
- current_ts = latest_ts
- elif ts != latest_ts:
- # we have to copy our absolute counters over to the new entry.
- values = {
- key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
- }
- values[id_col] = stats_id
- values["ts"] = ts
- values["bucket_size"] = self.stats_bucket_size
-
- self._simple_insert_txn(txn, table=table, values=values)
-
- # actually update the new value
- if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
- self._simple_update_txn(
- txn,
- table=table,
- keyvalues={id_col: stats_id, "ts": current_ts},
- updatevalues={field: value},
- )
- else:
- sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
- table,
- field,
- field,
- id_col,
- )
- txn.execute(sql, (value, stats_id, current_ts))
-
- return self.runInteraction("update_stats_delta", _update_stats_delta)
diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py
deleted file mode 100644
index a8b858eb4f..0000000000
--- a/tests/handlers/test_stats.py
+++ /dev/null
@@ -1,304 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from mock import Mock
-
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, Membership
-from synapse.rest import admin
-from synapse.rest.client.v1 import login, room
-
-from tests import unittest
-
-
-class StatsRoomTests(unittest.HomeserverTestCase):
-
- servlets = [
- admin.register_servlets_for_client_rest_resource,
- room.register_servlets,
- login.register_servlets,
- ]
-
- def prepare(self, reactor, clock, hs):
-
- self.store = hs.get_datastore()
- self.handler = self.hs.get_stats_handler()
-
- def _add_background_updates(self):
- """
- Add the background updates we need to run.
- """
- # Ugh, have to reset this flag
- self.store._all_done = False
-
- self.get_success(
- self.store._simple_insert(
- "background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
- )
- )
- self.get_success(
- self.store._simple_insert(
- "background_updates",
- {
- "update_name": "populate_stats_process_rooms",
- "progress_json": "{}",
- "depends_on": "populate_stats_createtables",
- },
- )
- )
- self.get_success(
- self.store._simple_insert(
- "background_updates",
- {
- "update_name": "populate_stats_cleanup",
- "progress_json": "{}",
- "depends_on": "populate_stats_process_rooms",
- },
- )
- )
-
- def test_initial_room(self):
- """
- The background updates will build the table from scratch.
- """
- r = self.get_success(self.store.get_all_room_state())
- self.assertEqual(len(r), 0)
-
- # Disable stats
- self.hs.config.stats_enabled = False
- self.handler.stats_enabled = False
-
- u1 = self.register_user("u1", "pass")
- u1_token = self.login("u1", "pass")
-
- room_1 = self.helper.create_room_as(u1, tok=u1_token)
- self.helper.send_state(
- room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
- )
-
- # Stats disabled, shouldn't have done anything
- r = self.get_success(self.store.get_all_room_state())
- self.assertEqual(len(r), 0)
-
- # Enable stats
- self.hs.config.stats_enabled = True
- self.handler.stats_enabled = True
-
- # Do the initial population of the user directory via the background update
- self._add_background_updates()
-
- while not self.get_success(self.store.has_completed_background_updates()):
- self.get_success(self.store.do_next_background_update(100), by=0.1)
-
- r = self.get_success(self.store.get_all_room_state())
-
- self.assertEqual(len(r), 1)
- self.assertEqual(r[0]["topic"], "foo")
-
- def test_initial_earliest_token(self):
- """
- Ingestion via notify_new_event will ignore tokens that the background
- update have already processed.
- """
- self.reactor.advance(86401)
-
- self.hs.config.stats_enabled = False
- self.handler.stats_enabled = False
-
- u1 = self.register_user("u1", "pass")
- u1_token = self.login("u1", "pass")
-
- u2 = self.register_user("u2", "pass")
- u2_token = self.login("u2", "pass")
-
- u3 = self.register_user("u3", "pass")
- u3_token = self.login("u3", "pass")
-
- room_1 = self.helper.create_room_as(u1, tok=u1_token)
- self.helper.send_state(
- room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
- )
-
- # Begin the ingestion by creating the temp tables. This will also store
- # the position that the deltas should begin at, once they take over.
- self.hs.config.stats_enabled = True
- self.handler.stats_enabled = True
- self.store._all_done = False
- self.get_success(self.store.update_stats_stream_pos(None))
-
- self.get_success(
- self.store._simple_insert(
- "background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
- )
- )
-
- while not self.get_success(self.store.has_completed_background_updates()):
- self.get_success(self.store.do_next_background_update(100), by=0.1)
-
- # Now, before the table is actually ingested, add some more events.
- self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
- self.helper.join(room=room_1, user=u2, tok=u2_token)
-
- # Now do the initial ingestion.
- self.get_success(
- self.store._simple_insert(
- "background_updates",
- {"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
- )
- )
- self.get_success(
- self.store._simple_insert(
- "background_updates",
- {
- "update_name": "populate_stats_cleanup",
- "progress_json": "{}",
- "depends_on": "populate_stats_process_rooms",
- },
- )
- )
-
- self.store._all_done = False
- while not self.get_success(self.store.has_completed_background_updates()):
- self.get_success(self.store.do_next_background_update(100), by=0.1)
-
- self.reactor.advance(86401)
-
- # Now add some more events, triggering ingestion. Because of the stream
- # position being set to before the events sent in the middle, a simpler
- # implementation would reprocess those events, and say there were four
- # users, not three.
- self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
- self.helper.join(room=room_1, user=u3, tok=u3_token)
-
- # Get the deltas! There should be two -- day 1, and day 2.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
-
- # The oldest has 2 joined members
- self.assertEqual(r[-1]["joined_members"], 2)
-
- # The newest has 3
- self.assertEqual(r[0]["joined_members"], 3)
-
- def test_incorrect_state_transition(self):
- """
- If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
- (JOIN, INVITE, LEAVE, BAN), an error is raised.
- """
- events = {
- "a1": {"membership": Membership.LEAVE},
- "a2": {"membership": "not a real thing"},
- }
-
- def get_event(event_id, allow_none=True):
- m = Mock()
- m.content = events[event_id]
- d = defer.Deferred()
- self.reactor.callLater(0.0, d.callback, m)
- return d
-
- def get_received_ts(event_id):
- return defer.succeed(1)
-
- self.store.get_received_ts = get_received_ts
- self.store.get_event = get_event
-
- deltas = [
- {
- "type": EventTypes.Member,
- "state_key": "some_user",
- "room_id": "room",
- "event_id": "a1",
- "prev_event_id": "a2",
- "stream_id": 60,
- }
- ]
-
- f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
- self.assertEqual(
- f.value.args[0], "'not a real thing' is not a valid prev_membership"
- )
-
- # And the other way...
- deltas = [
- {
- "type": EventTypes.Member,
- "state_key": "some_user",
- "room_id": "room",
- "event_id": "a2",
- "prev_event_id": "a1",
- "stream_id": 100,
- }
- ]
-
- f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
- self.assertEqual(
- f.value.args[0], "'not a real thing' is not a valid membership"
- )
-
- def test_redacted_prev_event(self):
- """
- If the prev_event does not exist, then it is assumed to be a LEAVE.
- """
- u1 = self.register_user("u1", "pass")
- u1_token = self.login("u1", "pass")
-
- room_1 = self.helper.create_room_as(u1, tok=u1_token)
-
- # Do the initial population of the user directory via the background update
- self._add_background_updates()
-
- while not self.get_success(self.store.has_completed_background_updates()):
- self.get_success(self.store.do_next_background_update(100), by=0.1)
-
- events = {"a1": None, "a2": {"membership": Membership.JOIN}}
-
- def get_event(event_id, allow_none=True):
- if events.get(event_id):
- m = Mock()
- m.content = events[event_id]
- else:
- m = None
- d = defer.Deferred()
- self.reactor.callLater(0.0, d.callback, m)
- return d
-
- def get_received_ts(event_id):
- return defer.succeed(1)
-
- self.store.get_received_ts = get_received_ts
- self.store.get_event = get_event
-
- deltas = [
- {
- "type": EventTypes.Member,
- "state_key": "some_user:test",
- "room_id": room_1,
- "event_id": "a2",
- "prev_event_id": "a1",
- "stream_id": 100,
- }
- ]
-
- # Handle our fake deltas, which has a user going from LEAVE -> JOIN.
- self.get_success(self.handler._handle_deltas(deltas))
-
- # One delta, with two joined members -- the room creator, and our fake
- # user.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
- self.assertEqual(len(r), 1)
- self.assertEqual(r[0]["joined_members"], 2)
|