From b7f7cc7ace5977d6494581e64669a0bb68208cc1 Mon Sep 17 00:00:00 2001 From: Hubert Chathi Date: Wed, 14 Aug 2019 17:14:40 -0700 Subject: add the version field to the index for e2e_room_keys --- .../storage/schema/delta/56/fix_room_keys_index.sql | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 synapse/storage/schema/delta/56/fix_room_keys_index.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/fix_room_keys_index.sql b/synapse/storage/schema/delta/56/fix_room_keys_index.sql new file mode 100644 index 0000000000..014cb3b538 --- /dev/null +++ b/synapse/storage/schema/delta/56/fix_room_keys_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2019 Matrix.org Foundation CIC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- version is supposed to be part of the room keys index +CREATE UNIQUE INDEX e2e_room_keys_with_version_idx ON e2e_room_keys(user_id, version, room_id, session_id); +DROP INDEX IF EXISTS e2e_room_keys_idx; -- cgit 1.4.1 From 4dab867288167881e5d89c8743b633be109bf603 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 21 Aug 2019 13:16:28 +0100 Subject: Drop some unused tables. (#5893) These tables are never used, so we may as well drop them. --- changelog.d/5893.misc | 1 + synapse/storage/events.py | 14 ++------- synapse/storage/room.py | 35 ---------------------- .../schema/delta/56/drop_unused_event_tables.sql | 20 +++++++++++++ 4 files changed, 23 insertions(+), 47 deletions(-) create mode 100644 changelog.d/5893.misc create mode 100644 synapse/storage/schema/delta/56/drop_unused_event_tables.sql (limited to 'synapse/storage/schema') diff --git a/changelog.d/5893.misc b/changelog.d/5893.misc new file mode 100644 index 0000000000..07ee4888dc --- /dev/null +++ b/changelog.d/5893.misc @@ -0,0 +1 @@ +Drop some unused tables. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ac876287fc..6fcfa4d789 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1302,15 +1302,11 @@ class EventsStore( "event_reference_hashes", "event_search", "event_to_state_groups", - "guest_access", - "history_visibility", "local_invites", - "room_names", "state_events", "rejections", "redactions", "room_memberships", - "topics", ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -1454,10 +1450,10 @@ class EventsStore( for event, _ in events_and_contexts: if event.type == EventTypes.Name: - # Insert into the room_names and event_search tables. + # Insert into the event_search table. self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: - # Insert into the topics table and event_search table. + # Insert into the event_search table. self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Message: # Insert into the event_search table. @@ -1465,12 +1461,6 @@ class EventsStore( elif event.type == EventTypes.Redaction: # Insert into the redactions table. self._store_redaction(txn, event) - elif event.type == EventTypes.RoomHistoryVisibility: - # Insert into the event_search table. - self._store_history_visibility_txn(txn, event) - elif event.type == EventTypes.GuestAccess: - # Insert into the event_search table. - self._store_guest_access_txn(txn, event) self._handle_event_relations(txn, event) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index bc606292b8..08e13f3a3b 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore): def _store_room_topic_txn(self, txn, event): if hasattr(event, "content") and "topic" in event.content: - self._simple_insert_txn( - txn, - "topics", - { - "event_id": event.event_id, - "room_id": event.room_id, - "topic": event.content["topic"], - }, - ) - self.store_event_search_txn( txn, event, "content.topic", event.content["topic"] ) def _store_room_name_txn(self, txn, event): if hasattr(event, "content") and "name" in event.content: - self._simple_insert_txn( - txn, - "room_names", - { - "event_id": event.event_id, - "room_id": event.room_id, - "name": event.content["name"], - }, - ) - self.store_event_search_txn( txn, event, "content.name", event.content["name"] ) @@ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore): txn, event, "content.body", event.content["body"] ) - def _store_history_visibility_txn(self, txn, event): - self._store_content_index_txn(txn, event, "history_visibility") - - def _store_guest_access_txn(self, txn, event): - self._store_content_index_txn(txn, event, "guest_access") - - def _store_content_index_txn(self, txn, event, key): - if hasattr(event, "content") and key in event.content: - sql = ( - "INSERT INTO %(key)s" - " (event_id, room_id, %(key)s)" - " VALUES (?, ?, ?)" % {"key": key} - ) - txn.execute(sql, (event.event_id, event.room_id, event.content[key])) - def add_event_report( self, room_id, event_id, user_id, reason, content, received_ts ): diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql new file mode 100644 index 0000000000..9f09922c67 --- /dev/null +++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- these tables are never used. +DROP TABLE IF EXISTS room_names; +DROP TABLE IF EXISTS topics; +DROP TABLE IF EXISTS history_visibility; +DROP TABLE IF EXISTS guest_access; -- cgit 1.4.1 From c9f11d09fc85470cf9a36909104734a3682c4b39 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 22 Aug 2019 10:43:13 +0100 Subject: Add missing index on users_in_public_rooms. (#5894) --- changelog.d/5894.misc | 1 + .../schema/delta/56/users_in_public_rooms_idx.sql | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 changelog.d/5894.misc create mode 100644 synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql (limited to 'synapse/storage/schema') diff --git a/changelog.d/5894.misc b/changelog.d/5894.misc new file mode 100644 index 0000000000..fca4485ff7 --- /dev/null +++ b/changelog.d/5894.misc @@ -0,0 +1 @@ +Add missing index on users_in_public_rooms to improve the performance of directory queries. diff --git a/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql new file mode 100644 index 0000000000..149f8be8b6 --- /dev/null +++ b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql @@ -0,0 +1,17 @@ +/* Copyright 2019 Matrix.org Foundation CIC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- this was apparently forgotten when the table was created back in delta 53. +CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id); -- cgit 1.4.1 From 8767b63a821eb8612e2ab830534fd6f40eb1aaaa Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 22 Aug 2019 18:21:10 +0100 Subject: Propagate opentracing contexts through EDUs (#5852) Propagate opentracing contexts through EDUs Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/5852.feature | 1 + docs/opentracing.rst | 27 +++- synapse/federation/federation_server.py | 15 +- synapse/federation/sender/transaction_manager.py | 170 ++++++++++++--------- synapse/federation/units.py | 3 + synapse/handlers/devicemessage.py | 27 +++- synapse/logging/opentracing.py | 26 ++++ synapse/storage/devices.py | 39 ++++- .../schema/delta/56/add_spans_to_device_lists.sql | 20 +++ 9 files changed, 234 insertions(+), 94 deletions(-) create mode 100644 changelog.d/5852.feature create mode 100644 synapse/storage/schema/delta/56/add_spans_to_device_lists.sql (limited to 'synapse/storage/schema') diff --git a/changelog.d/5852.feature b/changelog.d/5852.feature new file mode 100644 index 0000000000..4a0fc6c542 --- /dev/null +++ b/changelog.d/5852.feature @@ -0,0 +1 @@ +Pass opentracing contexts between servers when transmitting EDUs. diff --git a/docs/opentracing.rst b/docs/opentracing.rst index b91a2208a8..6e98ab56ba 100644 --- a/docs/opentracing.rst +++ b/docs/opentracing.rst @@ -32,7 +32,7 @@ It is up to the remote server to decide what it does with the spans it creates. This is called the sampling policy and it can be configured through Jaeger's settings. -For OpenTracing concepts see +For OpenTracing concepts see https://opentracing.io/docs/overview/what-is-tracing/. For more information about Jaeger's implementation see @@ -79,7 +79,7 @@ Homeserver whitelisting The homeserver whitelist is configured using regular expressions. A list of regular expressions can be given and their union will be compared when propagating any -spans contexts to another homeserver. +spans contexts to another homeserver. Though it's mostly safe to send and receive span contexts to and from untrusted users since span contexts are usually opaque ids it can lead to @@ -92,6 +92,29 @@ two problems, namely: but that doesn't prevent another server sending you baggage which will be logged to OpenTracing's logs. +========== +EDU FORMAT +========== + +EDUs can contain tracing data in their content. This is not specced but +it could be of interest for other homeservers. + +EDU format (if you're using jaeger): + +.. code-block:: json + + { + "edu_type": "type", + "content": { + "org.matrix.opentracing_context": { + "uber-trace-id": "fe57cf3e65083289" + } + } + } + +Though you don't have to use jaeger you must inject the span context into +`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method. + ================== Configuring Jaeger ================== diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9286ca3202..05fd49f3c1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -43,7 +43,7 @@ from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name from synapse.logging.context import nested_logging_context -from synapse.logging.opentracing import log_kv, trace +from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -811,12 +811,13 @@ class FederationHandlerRegistry(object): if not handler: logger.warn("No handler registered for EDU type %s", edu_type) - try: - yield handler(origin, content) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception: - logger.exception("Failed to handle edu %r", edu_type) + with start_active_span_from_edu(content, "handle_edu"): + try: + yield handler(origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception: + logger.exception("Failed to handle edu %r", edu_type) def on_query(self, query_type, args): handler = self.query_handlers.get(query_type) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 52706302f2..62ca6a3e87 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -14,11 +14,19 @@ # limitations under the License. import logging +from canonicaljson import json + from twisted.internet import defer from synapse.api.errors import HttpResponseException from synapse.federation.persistence import TransactionActions from synapse.federation.units import Transaction +from synapse.logging.opentracing import ( + extract_text_map, + set_tag, + start_active_span_follows_from, + tags, +) from synapse.util.metrics import measure_func logger = logging.getLogger(__name__) @@ -44,93 +52,109 @@ class TransactionManager(object): @defer.inlineCallbacks def send_new_transaction(self, destination, pending_pdus, pending_edus): - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus + # Make a transaction-sending opentracing span. This span follows on from + # all the edus in that transaction. This needs to be done since there is + # no active span here, so if the edus were not received by the remote the + # span would have no causality and it would be forgotten. + # The span_contexts is a generator so that it won't be evaluated if + # opentracing is disabled. (Yay speed!) - success = True + span_contexts = ( + extract_text_map(json.loads(edu.get_context())) for edu in pending_edus + ) - logger.debug("TX [%s] _attempt_new_transaction", destination) + with start_active_span_follows_from("send_transaction", span_contexts): - txn_id = str(self._next_txn_id) + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[1]) + pdus = [x[0] for x in pending_pdus] + edus = pending_edus - logger.debug( - "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", - destination, - txn_id, - len(pdus), - len(edus), - ) + success = True - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self._server_name, - destination=destination, - pdus=pdus, - edus=edus, - ) + logger.debug("TX [%s] _attempt_new_transaction", destination) - self._next_txn_id += 1 + txn_id = str(self._next_txn_id) - logger.info( - "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", - destination, - txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - ) + logger.debug( + "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", + destination, + txn_id, + len(pdus), + len(edus), + ) - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self._transport_layer.send_transaction( - transaction, json_data_cb + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self._server_name, + destination=destination, + pdus=pdus, + edus=edus, ) - code = 200 - except HttpResponseException as e: - code = e.code - response = e.response - if e.code in (401, 404, 429) or 500 <= e.code: - logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) - raise e + self._next_txn_id += 1 - logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", + destination, + txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + ) - if code == 200: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self._transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response + + if e.code in (401, 404, 429) or 500 <= e.code: + logger.info( + "TX [%s] {%s} got %d response", destination, txn_id, code + ) + raise e + + logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) + + if code == 200: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "TX [%s] {%s} Remote returned error for %s: %s", + destination, + txn_id, + e_id, + r, + ) + else: + for p in pdus: logger.warn( - "TX [%s] {%s} Remote returned error for %s: %s", + "TX [%s] {%s} Failed to send event %s", destination, txn_id, - e_id, - r, + p.event_id, ) - else: - for p in pdus: - logger.warn( - "TX [%s] {%s} Failed to send event %s", - destination, - txn_id, - p.event_id, - ) - success = False + success = False - return success + set_tag(tags.ERROR, not success) + return success diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 14aad8f09d..aa84621206 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -38,6 +38,9 @@ class Edu(JsonEncodedObject): internal_keys = ["origin", "destination"] + def get_context(self): + return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") + class Transaction(JsonEncodedObject): """ A transaction is a list of Pdus and Edus to be sent to a remote home diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index e1ebb6346c..c7d56779b8 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -15,9 +15,17 @@ import logging +from canonicaljson import json + from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.logging.opentracing import ( + get_active_span_text_map, + set_tag, + start_active_span, + whitelisted_homeserver, +) from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string @@ -100,14 +108,21 @@ class DeviceMessageHandler(object): message_id = random_string(16) + context = get_active_span_text_map() + remote_edu_contents = {} for destination, messages in remote_messages.items(): - remote_edu_contents[destination] = { - "messages": messages, - "sender": sender_user_id, - "type": message_type, - "message_id": message_id, - } + with start_active_span("to_device_for_user"): + set_tag("destination", destination) + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + "org.matrix.opentracing_context": json.dumps(context) + if whitelisted_homeserver(destination) + else None, + } stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 4abea4474b..dd296027a1 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -149,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist. ``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes in a destination and compares it to the whitelist. +Most injection methods take a 'destination' arg. The context will only be injected +if the destination matches the whitelist or the destination is None. + ======= Gotchas ======= @@ -576,6 +579,29 @@ def inject_active_span_text_map(carrier, destination, check_destination=True): ) +def get_active_span_text_map(destination=None): + """ + Gets a span context as a dict. This can be used instead of manually + injecting a span into an empty carrier. + + Args: + destination (str): the name of the remote server. + + Returns: + dict: the active span's context if opentracing is enabled, otherwise empty. + """ + + if not opentracing or (destination and not whitelisted_homeserver(destination)): + return {} + + carrier = {} + opentracing.tracer.inject( + opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier + ) + + return carrier + + def active_span_context_as_string(): """ Returns: diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8f72d92895..e11881161d 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,11 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.logging.opentracing import ( + get_active_span_text_map, + trace, + whitelisted_homeserver, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore @@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore): return {d["device_id"]: d for d in devices} + @trace @defer.inlineCallbacks def get_devices_by_remote(self, destination, from_stream_id, limit): """Get stream of updates to send to remote servers @@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore): # (user_id, device_id) entries into a map, with the value being # the max stream_id across each set of duplicate entries # - # maps (user_id, device_id) -> stream_id + # maps (user_id, device_id) -> (stream_id, opentracing_context) # as long as their stream_id does not match that of the last row + # + # opentracing_context contains the opentracing metadata for the request + # that created the poke + # + # The most recent request's opentracing_context is used as the + # context which created the Edu. + query_map = {} for update in updates: if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: @@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore): break key = (update[0], update[1]) - query_map[key] = max(query_map.get(key, 0), update[2]) + + update_context = update[3] + update_stream_id = update[2] + + previous_update_stream_id, _ = query_map.get(key, (0, None)) + + if update_stream_id > previous_update_stream_id: + query_map[key] = (update_stream_id, update_context) # If we didn't find any updates with a stream_id lower than the cutoff, it # means that there are more than limit updates all of which have the same @@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore): List: List of device updates """ sql = """ - SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes + SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? ORDER BY stream_id LIMIT ? @@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore): Args: destination (str): The host the device updates are intended for from_stream_id (int): The minimum stream_id to filter updates by, exclusive - query_map (Dict[(str, str): int]): Dictionary mapping - user_id/device_id to update stream_id + query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping + user_id/device_id to update stream_id and the relevent json-encoded + opentracing context Returns: List[Dict]: List of objects representing an device update EDU @@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore): destination, user_id, from_stream_id ) for device_id, device in iteritems(user_devices): - stream_id = query_map[(user_id, device_id)] + stream_id, opentracing_context = query_map[(user_id, device_id)] result = { "user_id": user_id, "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, + "org.matrix.opentracing_context": opentracing_context, } prev_id = stream_id @@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): ], ) + context = get_active_span_text_map() + self._simple_insert_many_txn( txn, table="device_lists_outbound_pokes", @@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): "device_id": device_id, "sent": False, "ts": now, + "opentracing_context": json.dumps(context) + if whitelisted_homeserver(destination) + else None, } for destination in hosts for device_id in device_ids diff --git a/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql new file mode 100644 index 0000000000..41807eb1e7 --- /dev/null +++ b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Opentracing context data for inclusion in the device_list_update EDUs, as a + * json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination). + */ +ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT; -- cgit 1.4.1 From a4bf72c30c5953b721a64eae89db186fa8735bb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2019 17:38:51 +0100 Subject: Censor redactions in DB after a month --- synapse/storage/events.py | 88 +++++++++++++++++++++- .../storage/schema/delta/56/redaction_censor.sql | 17 +++++ tests/storage/test_redaction.py | 71 +++++++++++++++++ 3 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/56/redaction_censor.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a95c36a8b..2970da6829 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,7 +23,7 @@ from functools import wraps from six import iteritems, text_type from six.moves import range -from canonicaljson import json +from canonicaljson import encode_canonical_json, json from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.events.utils import prune_event_dict from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.utils import log_function from synapse.metrics import BucketCollector @@ -262,6 +263,13 @@ class EventsStore( hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + def _censor_redactions(): + return run_as_background_process( + "_censor_redactions", self._censor_redactions + ) + + hs.get_clock().looping_call(_censor_redactions, 10 * 60 * 1000) + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -1548,6 +1556,84 @@ class EventsStore( (event.event_id, event.redacts), ) + @defer.inlineCallbacks + def _censor_redactions(self): + """Censors all redactions older than a month that haven't been censored. + + By censor we mean update the event_json table with the redacted event. + + Returns: + Deferred + """ + + if self.stream_ordering_month_ago is None: + return + + max_pos = self.stream_ordering_month_ago + + # We fetch all redactions that point to an event that we have that has + # a stream ordering from over a month ago, that we haven't yet censored + # in the DB. + sql = """ + SELECT er.event_id, redacts FROM redactions + INNER JOIN events AS er USING (event_id) + INNER JOIN events AS eb ON (er.room_id = eb.room_id AND redacts = eb.event_id) + WHERE NOT have_censored + AND ? <= er.stream_ordering AND er.stream_ordering <= ? + ORDER BY er.stream_ordering ASC + LIMIT ? + """ + + rows = yield self._execute( + "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 + ) + + updates = [] + + for redaction_id, event_id in rows: + redaction_event = yield self.get_event(redaction_id, allow_none=True) + original_event = yield self.get_event( + event_id, allow_rejected=True, allow_none=True + ) + + # The SQL above ensures that we have both the redaction and + # original event, so if the `get_event` calls return None it + # means that the redaction wasn't allowed. Either way we know that + # the result won't change so we mark the fact that we've checked. + if ( + redaction_event + and original_event + and original_event.internal_metadata.is_redacted() + ): + # Redaction was allowed + pruned_json = encode_canonical_json( + prune_event_dict(original_event.get_dict()) + ) + else: + # Redaction wasn't allowed + pruned_json = None + + updates.append((redaction_id, event_id, pruned_json)) + + def _update_censor_txn(txn): + for redaction_id, event_id, pruned_json in updates: + if pruned_json: + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + + self._simple_update_one_txn( + txn, + table="redactions", + keyvalues={"event_id": redaction_id}, + updatevalues={"have_censored": True}, + ) + + yield self.runInteraction("_update_censor_txn", _update_censor_txn) + @defer.inlineCallbacks def count_daily_messages(self): """ diff --git a/synapse/storage/schema/delta/56/redaction_censor.sql b/synapse/storage/schema/delta/56/redaction_censor.sql new file mode 100644 index 0000000000..fe51b02309 --- /dev/null +++ b/synapse/storage/schema/delta/56/redaction_censor.sql @@ -0,0 +1,17 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false; +CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored; diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index d961b81d48..0c9f3c7071 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -17,6 +17,8 @@ from mock import Mock +from canonicaljson import json + from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -286,3 +288,72 @@ class RedactionTestCase(unittest.HomeserverTestCase): self.assertEqual( fetched.unsigned["redacted_because"].event_id, redaction_event_id2 ) + + def test_redact_censor(self): + """Test that a redacted event gets censored in the DB after a month + """ + + self.get_success( + self.inject_room_member(self.room1, self.u_alice, Membership.JOIN) + ) + + msg_event = self.get_success(self.inject_message(self.room1, self.u_alice, "t")) + + # Check event has not been redacted: + event = self.get_success(self.store.get_event(msg_event.event_id)) + + self.assertObjectHasAttributes( + { + "type": EventTypes.Message, + "user_id": self.u_alice.to_string(), + "content": {"body": "t", "msgtype": "message"}, + }, + event, + ) + + self.assertFalse("redacted_because" in event.unsigned) + + # Redact event + reason = "Because I said so" + self.get_success( + self.inject_redaction(self.room1, msg_event.event_id, self.u_alice, reason) + ) + + event = self.get_success(self.store.get_event(msg_event.event_id)) + + self.assertTrue("redacted_because" in event.unsigned) + + self.assertObjectHasAttributes( + { + "type": EventTypes.Message, + "user_id": self.u_alice.to_string(), + "content": {}, + }, + event, + ) + + event_json = self.get_success( + self.store._simple_select_one_onecol( + table="event_json", + keyvalues={"event_id": msg_event.event_id}, + retcol="json", + ) + ) + + self.assert_dict( + {"content": {"body": "t", "msgtype": "message"}}, json.loads(event_json) + ) + + # Advance by 30 days + self.reactor.advance(60 * 60 * 24 * 31) + self.reactor.advance(60 * 60 * 2) + + event_json = self.get_success( + self.store._simple_select_one_onecol( + table="event_json", + keyvalues={"event_id": msg_event.event_id}, + retcol="json", + ) + ) + + self.assert_dict({"content": {}}, json.loads(event_json)) -- cgit 1.4.1 From 6e834e94fcc97811e4cc8185e86c6b9da06eb28e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Sep 2019 13:04:27 +0100 Subject: Fix and refactor room and user stats (#5971) Previously the stats were not being correctly populated. --- changelog.d/5971.bugfix | 1 + docs/room_and_user_statistics.md | 62 ++ synapse/config/stats.py | 13 +- synapse/handlers/stats.py | 307 +++--- synapse/storage/events.py | 5 +- synapse/storage/registration.py | 12 + synapse/storage/roommember.py | 44 +- .../storage/schema/delta/56/stats_separated.sql | 152 +++ synapse/storage/stats.py | 1036 ++++++++++++++------ tests/handlers/test_stats.py | 643 +++++++++--- tests/rest/client/v1/utils.py | 8 +- 11 files changed, 1642 insertions(+), 641 deletions(-) create mode 100644 changelog.d/5971.bugfix create mode 100644 docs/room_and_user_statistics.md create mode 100644 synapse/storage/schema/delta/56/stats_separated.sql (limited to 'synapse/storage/schema') diff --git a/changelog.d/5971.bugfix b/changelog.d/5971.bugfix new file mode 100644 index 0000000000..9ea095103b --- /dev/null +++ b/changelog.d/5971.bugfix @@ -0,0 +1 @@ +Fix room and user stats tracking. diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md new file mode 100644 index 0000000000..e1facb38d4 --- /dev/null +++ b/docs/room_and_user_statistics.md @@ -0,0 +1,62 @@ +Room and User Statistics +======================== + +Synapse maintains room and user statistics (as well as a cache of room state), +in various tables. These can be used for administrative purposes but are also +used when generating the public room directory. + + +# Synapse Developer Documentation + +## High-Level Concepts + +### Definitions + +* **subject**: Something we are tracking stats about – currently a room or user. +* **current row**: An entry for a subject in the appropriate current statistics + table. Each subject can have only one. +* **historical row**: An entry for a subject in the appropriate historical + statistics table. Each subject can have any number of these. + +### Overview + +Stats are maintained as time series. There are two kinds of column: + +* absolute columns – where the value is correct for the time given by `end_ts` + in the stats row. (Imagine a line graph for these values) + * They can also be thought of as 'gauges' in Prometheus, if you are familiar. +* per-slice columns – where the value corresponds to how many of the occurrences + occurred within the time slice given by `(end_ts − bucket_size)…end_ts` + or `start_ts…end_ts`. (Imagine a histogram for these values) + +Stats are maintained in two tables (for each type): current and historical. + +Current stats correspond to the present values. Each subject can only have one +entry. + +Historical stats correspond to values in the past. Subjects may have multiple +entries. + +## Concepts around the management of stats + +### Current rows + +Current rows contain the most up-to-date statistics for a room. +They only contain absolute columns + +### Historical rows + +Historical rows can always be considered to be valid for the time slice and +end time specified. + +* historical rows will not exist for every time slice – they will be omitted + if there were no changes. In this case, the following assumptions can be + made to interpolate/recreate missing rows: + - absolute fields have the same values as in the preceding row + - per-slice fields are zero (`0`) +* historical rows will not be retained forever – rows older than a configurable + time will be purged. + +#### Purge + +The purging of historical rows is not yet implemented. diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b518a3ed9c..b18ddbd1fa 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -27,19 +27,16 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 + self.stats_bucket_size = 86400 * 1000 self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = ( - self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + self.stats_bucket_size = self.parse_duration( + stats_config.get("bucket_size", "1d") ) - self.stats_retention = ( - self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) - / 1000 + self.stats_retention = self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) ) def generate_config_section(self, config_dir_path, server_name, **kwargs): diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 4449da6669..921735edb3 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -14,15 +14,14 @@ # limitations under the License. import logging +from collections import Counter from twisted.internet import defer -from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.constants import EventTypes, Membership from synapse.handlers.state_deltas import StateDeltasHandler from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import UserID -from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler): def notify_new_event(self): """Called when there may be more deltas to process """ - if not self.hs.config.stats_enabled: + if not self.hs.config.stats_enabled or self._is_processing: return - if self._is_processing: - return + self._is_processing = True @defer.inlineCallbacks def process(): @@ -75,39 +73,72 @@ class StatsHandler(StateDeltasHandler): finally: self._is_processing = False - self._is_processing = True run_as_background_process("stats.notify_new_event", process) @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB if self.pos is None: - self.pos = yield self.store.get_stats_stream_pos() - - # If still None then the initial background update hasn't happened yet - if self.pos is None: - return None + self.pos = yield self.store.get_stats_positions() # Loop round handling deltas until we're up to date + while True: - with Measure(self.clock, "stats_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) - if not deltas: - return + deltas = yield self.store.get_current_state_deltas(self.pos) + + if deltas: + logger.debug("Handling %d state deltas", len(deltas)) + room_deltas, user_deltas = yield self._handle_deltas(deltas) + + max_pos = deltas[-1]["stream_id"] + else: + room_deltas = {} + user_deltas = {} + max_pos = yield self.store.get_room_max_stream_ordering() - logger.info("Handling %d state deltas", len(deltas)) - yield self._handle_deltas(deltas) + # Then count deltas for total_events and total_event_bytes. + room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes( + self.pos, max_pos + ) + + for room_id, fields in room_count.items(): + room_deltas.setdefault(room_id, {}).update(fields) + + for user_id, fields in user_count.items(): + user_deltas.setdefault(user_id, {}).update(fields) + + logger.debug("room_deltas: %s", room_deltas) + logger.debug("user_deltas: %s", user_deltas) - self.pos = deltas[-1]["stream_id"] - yield self.store.update_stats_stream_pos(self.pos) + # Always call this so that we update the stats position. + yield self.store.bulk_update_stats_delta( + self.clock.time_msec(), + updates={"room": room_deltas, "user": user_deltas}, + stream_id=max_pos, + ) + + event_processing_positions.labels("stats").set(max_pos) - event_processing_positions.labels("stats").set(self.pos) + if self.pos == max_pos: + break + + self.pos = max_pos @defer.inlineCallbacks def _handle_deltas(self, deltas): + """Called with the state deltas to process + + Returns: + Deferred[tuple[dict[str, Counter], dict[str, counter]]] + Resovles to two dicts, the room deltas and the user deltas, + mapping from room/user ID to changes in the various fields. """ - Called with the state deltas to process - """ + + room_to_stats_deltas = {} + user_to_stats_deltas = {} + + room_to_state_updates = {} + for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -115,11 +146,10 @@ class StatsHandler(StateDeltasHandler): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] - stream_pos = delta["stream_id"] - logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id) - token = yield self.store.get_earliest_token_for_room_stats(room_id) + token = yield self.store.get_earliest_token_for_stats("room", room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. @@ -131,203 +161,130 @@ class StatsHandler(StateDeltasHandler): continue if event_id is None and prev_event_id is None: - # Errr... + logger.error( + "event ID is None and so is the previous event ID. stream_id: %s", + stream_id, + ) continue event_content = {} + sender = None if event_id is not None: event = yield self.store.get_event(event_id, allow_none=True) if event: event_content = event.content or {} + sender = event.sender + + # All the values in this dict are deltas (RELATIVE changes) + room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter()) - # We use stream_pos here rather than fetch by event_id as event_id - # may be None - now = yield self.store.get_received_ts_by_stream_pos(stream_pos) + room_state = room_to_state_updates.setdefault(room_id, {}) - # quantise time to the nearest bucket - now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size + if prev_event_id is None: + # this state event doesn't overwrite another, + # so it is a new effective/current state event + room_stats_delta["current_state_events"] += 1 if typ == EventTypes.Member: # we could use _get_key_change here but it's a bit inefficient # given we're not testing for a specific result; might as well # just grab the prev_membership and membership strings and # compare them. - prev_event_content = {} + # We take None rather than leave as a previous membership + # in the absence of a previous event because we do not want to + # reduce the leave count when a new-to-the-room user joins. + prev_membership = None if prev_event_id is not None: prev_event = yield self.store.get_event( prev_event_id, allow_none=True ) if prev_event: prev_event_content = prev_event.content + prev_membership = prev_event_content.get( + "membership", Membership.LEAVE + ) membership = event_content.get("membership", Membership.LEAVE) - prev_membership = prev_event_content.get("membership", Membership.LEAVE) - - if prev_membership == membership: - continue - if prev_membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", -1 - ) + if prev_membership is None: + logger.debug("No previous membership for this user.") + elif membership == prev_membership: + pass # noop + elif prev_membership == Membership.JOIN: + room_stats_delta["joined_members"] -= 1 elif prev_membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", -1 - ) + room_stats_delta["invited_members"] -= 1 elif prev_membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", -1 - ) + room_stats_delta["left_members"] -= 1 elif prev_membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", -1 - ) + room_stats_delta["banned_members"] -= 1 else: - err = "%s is not a valid prev_membership" % (repr(prev_membership),) - logger.error(err) - raise ValueError(err) + raise ValueError( + "%r is not a valid prev_membership" % (prev_membership,) + ) + if membership == prev_membership: + pass # noop if membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", +1 - ) + room_stats_delta["joined_members"] += 1 elif membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", +1 - ) + room_stats_delta["invited_members"] += 1 + + if sender and self.is_mine_id(sender): + user_to_stats_deltas.setdefault(sender, Counter())[ + "invites_sent" + ] += 1 + elif membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", +1 - ) + room_stats_delta["left_members"] += 1 elif membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", +1 - ) + room_stats_delta["banned_members"] += 1 else: - err = "%s is not a valid membership" % (repr(membership),) - logger.error(err) - raise ValueError(err) + raise ValueError("%r is not a valid membership" % (membership,)) user_id = state_key if self.is_mine_id(user_id): - # update user_stats as it's one of our users - public = yield self._is_public_room(room_id) - - if membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - -1, - ) - elif membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - +1, - ) + # this accounts for transitions like leave → ban and so on. + has_changed_joinedness = (prev_membership == Membership.JOIN) != ( + membership == Membership.JOIN + ) - elif typ == EventTypes.Create: - # Newly created room. Add it with all blank portions. - yield self.store.update_room_state( - room_id, - { - "join_rules": None, - "history_visibility": None, - "encryption": None, - "name": None, - "topic": None, - "avatar": None, - "canonical_alias": None, - }, - ) + if has_changed_joinedness: + delta = +1 if membership == Membership.JOIN else -1 - elif typ == EventTypes.JoinRules: - yield self.store.update_room_state( - room_id, {"join_rules": event_content.get("join_rule")} - ) + user_to_stats_deltas.setdefault(user_id, Counter())[ + "joined_rooms" + ] += delta - is_public = yield self._get_key_change( - prev_event_id, event_id, "join_rule", JoinRules.PUBLIC - ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + room_stats_delta["local_users_in_room"] += delta + elif typ == EventTypes.Create: + room_state["is_federatable"] = event_content.get("m.federate", True) + if sender and self.is_mine_id(sender): + user_to_stats_deltas.setdefault(sender, Counter())[ + "rooms_created" + ] += 1 + elif typ == EventTypes.JoinRules: + room_state["join_rules"] = event_content.get("join_rule") elif typ == EventTypes.RoomHistoryVisibility: - yield self.store.update_room_state( - room_id, - {"history_visibility": event_content.get("history_visibility")}, - ) - - is_public = yield self._get_key_change( - prev_event_id, event_id, "history_visibility", "world_readable" + room_state["history_visibility"] = event_content.get( + "history_visibility" ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) - elif typ == EventTypes.Encryption: - yield self.store.update_room_state( - room_id, {"encryption": event_content.get("algorithm")} - ) + room_state["encryption"] = event_content.get("algorithm") elif typ == EventTypes.Name: - yield self.store.update_room_state( - room_id, {"name": event_content.get("name")} - ) + room_state["name"] = event_content.get("name") elif typ == EventTypes.Topic: - yield self.store.update_room_state( - room_id, {"topic": event_content.get("topic")} - ) + room_state["topic"] = event_content.get("topic") elif typ == EventTypes.RoomAvatar: - yield self.store.update_room_state( - room_id, {"avatar": event_content.get("url")} - ) + room_state["avatar"] = event_content.get("url") elif typ == EventTypes.CanonicalAlias: - yield self.store.update_room_state( - room_id, {"canonical_alias": event_content.get("alias")} - ) + room_state["canonical_alias"] = event_content.get("alias") + elif typ == EventTypes.GuestAccess: + room_state["guest_access"] = event_content.get("guest_access") - @defer.inlineCallbacks - def update_public_room_stats(self, ts, room_id, is_public): - """ - Increment/decrement a user's number of public rooms when a room they are - in changes to/from public visibility. + for room_id, state in room_to_state_updates.items(): + yield self.store.update_room_state(room_id, state) - Args: - ts (int): Timestamp in seconds - room_id (str) - is_public (bool) - """ - # For now, blindly iterate over all local users in the room so that - # we can handle the whole problem of copying buckets over as needed - user_ids = yield self.store.get_users_in_room(room_id) - - for user_id in user_ids: - if self.hs.is_mine(UserID.from_string(user_id)): - yield self.store.update_stats_delta( - ts, "user", user_id, "public_rooms", +1 if is_public else -1 - ) - yield self.store.update_stats_delta( - ts, "user", user_id, "private_rooms", -1 if is_public else +1 - ) - - @defer.inlineCallbacks - def _is_public_room(self, room_id): - join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules) - history_visibility = yield self.state.get_current_state( - room_id, EventTypes.RoomHistoryVisibility - ) - - if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or ( - ( - history_visibility - and history_visibility.content.get("history_visibility") - == "world_readable" - ) - ): - return True - else: - return False + return room_to_stats_deltas, user_to_stats_deltas diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 32050868ff..1958afe1d7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2270,8 +2270,9 @@ class EventsStore( "room_aliases", "room_depth", "room_memberships", - "room_state", - "room_stats", + "room_stats_state", + "room_stats_current", + "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3f50324253..2d3c7e2dc9 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -869,6 +869,17 @@ class RegistrationStore( (user_id_obj.localpart, create_profile_with_displayname), ) + if self.hs.config.stats_enabled: + # we create a new completed user statistics row + + # we don't strictly need current_token since this user really can't + # have any state deltas before now (as it is a new user), but still, + # we include it for completeness. + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + self._update_stats_delta_txn( + txn, now, "user", user_id, {}, complete_with_stream_id=current_token + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) @@ -1140,6 +1151,7 @@ class RegistrationStore( deferred str|None: A str representing a link to redirect the user to if there is one. """ + # Insert everything into a transaction in order to run atomically def validate_threepid_session_txn(txn): row = self._simple_select_one_txn( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index eecb276465..f8b682ebd9 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -112,29 +112,31 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): - def f(txn): - # If we can assume current_state_events.membership is up to date - # then we can avoid a join, which is a Very Good Thing given how - # frequently this function gets called. - if self._current_state_events_membership_up_to_date: - sql = """ - SELECT state_key FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? AND membership = ? - """ - else: - sql = """ - SELECT state_key FROM room_memberships as m - INNER JOIN current_state_events as c - ON m.event_id = c.event_id - AND m.room_id = c.room_id - AND m.user_id = c.state_key - WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? - """ + return self.runInteraction( + "get_users_in_room", self.get_users_in_room_txn, room_id + ) - txn.execute(sql, (room_id, Membership.JOIN)) - return [to_ascii(r[0]) for r in txn] + def get_users_in_room_txn(self, txn, room_id): + # If we can assume current_state_events.membership is up to date + # then we can avoid a join, which is a Very Good Thing given how + # frequently this function gets called. + if self._current_state_events_membership_up_to_date: + sql = """ + SELECT state_key FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? AND membership = ? + """ + else: + sql = """ + SELECT state_key FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? + """ - return self.runInteraction("get_users_in_room", f) + txn.execute(sql, (room_id, Membership.JOIN)) + return [to_ascii(r[0]) for r in txn] @cached(max_entries=100000) def get_room_summary(self, room_id): diff --git a/synapse/storage/schema/delta/56/stats_separated.sql b/synapse/storage/schema/delta/56/stats_separated.sql new file mode 100644 index 0000000000..163529c071 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated.sql @@ -0,0 +1,152 @@ +/* Copyright 2018 New Vector Ltd + * Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +----- First clean up from previous versions of room stats. + +-- First remove old stats stuff +DROP TABLE IF EXISTS room_stats; +DROP TABLE IF EXISTS room_state; +DROP TABLE IF EXISTS room_stats_state; +DROP TABLE IF EXISTS user_stats; +DROP TABLE IF EXISTS room_stats_earliest_tokens; +DROP TABLE IF EXISTS _temp_populate_stats_position; +DROP TABLE IF EXISTS _temp_populate_stats_rooms; +DROP TABLE IF EXISTS stats_stream_pos; + +-- Unschedule old background updates if they're still scheduled +DELETE FROM background_updates WHERE update_name IN ( + 'populate_stats_createtables', + 'populate_stats_process_rooms', + 'populate_stats_process_users', + 'populate_stats_cleanup' +); + +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', ''); + +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_users', '{}', 'populate_stats_process_rooms'); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +DROP TABLE IF EXISTS stats_incremental_position; +CREATE TABLE stats_incremental_position ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +-- insert a null row and make sure it is the only one. +INSERT INTO stats_incremental_position ( + stream_id +) SELECT COALESCE(MAX(stream_ordering), 0) from events; + +-- represents PRESENT room statistics for a room +-- only holds absolute fields +DROP TABLE IF EXISTS room_stats_current; +CREATE TABLE room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- These are absolute counts + current_state_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + local_users_in_room INT NOT NULL, + + -- The maximum delta stream position that this row takes into account. + completed_delta_stream_id BIGINT NOT NULL +); + + +-- represents HISTORICAL room statistics for a room +DROP TABLE IF EXISTS room_stats_historical; +CREATE TABLE room_stats_historical ( + room_id TEXT NOT NULL, + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms). + -- Note that end_ts is quantised. + end_ts BIGINT NOT NULL, + bucket_size BIGINT NOT NULL, + + -- These stats are absolute counts + current_state_events BIGINT NOT NULL, + joined_members BIGINT NOT NULL, + invited_members BIGINT NOT NULL, + left_members BIGINT NOT NULL, + banned_members BIGINT NOT NULL, + local_users_in_room BIGINT NOT NULL, + + -- These stats are per time slice + total_events BIGINT NOT NULL, + total_event_bytes BIGINT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- represents PRESENT statistics for a user +-- only holds absolute fields +DROP TABLE IF EXISTS user_stats_current; +CREATE TABLE user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + joined_rooms BIGINT NOT NULL, + + -- The maximum delta stream position that this row takes into account. + completed_delta_stream_id BIGINT NOT NULL +); + +-- represents HISTORICAL statistics for a user +DROP TABLE IF EXISTS user_stats_historical; +CREATE TABLE user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size BIGINT NOT NULL, + + joined_rooms BIGINT NOT NULL, + + invites_sent BIGINT NOT NULL, + rooms_created BIGINT NOT NULL, + total_events BIGINT NOT NULL, + total_event_bytes BIGINT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX user_stats_historical_end_ts ON user_stats_historical (end_ts); + + +CREATE TABLE room_stats_state ( + room_id TEXT NOT NULL, + name TEXT, + canonical_alias TEXT, + join_rules TEXT, + history_visibility TEXT, + encryption TEXT, + avatar TEXT, + guest_access TEXT, + is_federatable BOOLEAN, + topic TEXT +); + +CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id); diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e13efed417..6560173c08 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +15,22 @@ # limitations under the License. import logging +from itertools import chain from twisted.internet import defer +from twisted.internet.defer import DeferredLock from synapse.api.constants import EventTypes, Membership -from synapse.storage.prepare_database import get_statements +from synapse.storage import PostgresEngine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) +# You can think of these as Prometheus Gauges. +# You can draw these stats on a line graph. +# Example: number of users in a room ABSOLUTE_STATS_FIELDS = { "room": ( "current_state_events", @@ -32,14 +38,23 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "local_users_in_room", ), - "user": ("public_rooms", "private_rooms"), + "user": ("joined_rooms",), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +# You can draw these stats on a histogram. +# Example: number of events sent locally during a time slice +PER_SLICE_FIELDS = { + "room": ("total_events", "total_event_bytes"), + "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"), +} + +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -TEMP_TABLE = "_temp_populate_stats" +# these are the tables (& ID columns) which contain our actual subjects +TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")} class StatsStore(StateDeltasStore): @@ -51,136 +66,102 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.register_background_update_handler( - "populate_stats_createtables", self._populate_stats_createtables - ) + self.stats_delta_processing_lock = DeferredLock() + self.register_background_update_handler( "populate_stats_process_rooms", self._populate_stats_process_rooms ) self.register_background_update_handler( - "populate_stats_cleanup", self._populate_stats_cleanup + "populate_stats_process_users", self._populate_stats_process_users ) + # we no longer need to perform clean-up, but we will give ourselves + # the potential to reintroduce it in the future – so documentation + # will still encourage the use of this no-op handler. + self.register_noop_background_update("populate_stats_cleanup") + self.register_noop_background_update("populate_stats_prepare") - @defer.inlineCallbacks - def _populate_stats_createtables(self, progress, batch_size): - - if not self.stats_enabled: - yield self._end_background_update("populate_stats_createtables") - return 1 - - # Get all the rooms that we want to process. - def _make_staging_area(txn): - # Create the temporary tables - stmts = get_statements( - """ - -- We just recreate the table, we'll be reinserting the - -- correct entries again later anyway. - DROP TABLE IF EXISTS {temp}_rooms; - - CREATE TABLE IF NOT EXISTS {temp}_rooms( - room_id TEXT NOT NULL, - events BIGINT NOT NULL - ); - - CREATE INDEX {temp}_rooms_events - ON {temp}_rooms(events); - CREATE INDEX {temp}_rooms_id - ON {temp}_rooms(room_id); - """.format( - temp=TEMP_TABLE - ).splitlines() - ) - - for statement in stmts: - txn.execute(statement) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) - - # Get rooms we want to process from the database, only adding - # those that we haven't (i.e. those not in room_stats_earliest_token) - sql = """ - INSERT INTO %s_rooms (room_id, events) - SELECT c.room_id, count(*) FROM current_state_events AS c - LEFT JOIN room_stats_earliest_token AS t USING (room_id) - WHERE t.room_id IS NULL - GROUP BY c.room_id - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.runInteraction("populate_stats_temp_build", _make_staging_area) - yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) - self.get_earliest_token_for_room_stats.invalidate_all() + Args: + ts (int): the timestamp to quantise, in milliseconds since the Unix + Epoch - yield self._end_background_update("populate_stats_createtables") - return 1 + Returns: + int: a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. + """ + return (ts // self.stats_bucket_size) * self.stats_bucket_size @defer.inlineCallbacks - def _populate_stats_cleanup(self, progress, batch_size): + def _populate_stats_process_users(self, progress, batch_size): """ - Update the user directory stream position, then clean up the old tables. + This is a background update which regenerates statistics for users. """ if not self.stats_enabled: - yield self._end_background_update("populate_stats_cleanup") + yield self._end_background_update("populate_stats_process_users") return 1 - position = yield self._simple_select_one_onecol( - TEMP_TABLE + "_position", None, "position" + last_user_id = progress.get("last_user_id", "") + + def _get_next_batch(txn): + sql = """ + SELECT DISTINCT name FROM users + WHERE name > ? + ORDER BY name ASC + LIMIT ? + """ + txn.execute(sql, (last_user_id, batch_size)) + return [r for r, in txn] + + users_to_work_on = yield self.runInteraction( + "_populate_stats_process_users", _get_next_batch ) - yield self.update_stats_stream_pos(position) - def _delete_staging_area(txn): - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + # No more rooms -- complete the transaction. + if not users_to_work_on: + yield self._end_background_update("populate_stats_process_users") + return 1 - yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) + for user_id in users_to_work_on: + yield self._calculate_and_set_initial_state_for_user(user_id) + progress["last_user_id"] = user_id - yield self._end_background_update("populate_stats_cleanup") - return 1 + yield self.runInteraction( + "populate_stats_process_users", + self._background_update_progress_txn, + "populate_stats_process_users", + progress, + ) + + return len(users_to_work_on) @defer.inlineCallbacks def _populate_stats_process_rooms(self, progress, batch_size): - + """ + This is a background update which regenerates statistics for rooms. + """ if not self.stats_enabled: yield self._end_background_update("populate_stats_process_rooms") return 1 - # If we don't have progress filed, delete everything. - if not progress: - yield self.delete_all_stats() + last_room_id = progress.get("last_room_id", "") def _get_next_batch(txn): - # Only fetch 250 rooms, so we don't fetch too many at once, even - # if those 250 rooms have less than batch_size state events. sql = """ - SELECT room_id, events FROM %s_rooms - ORDER BY events DESC - LIMIT 250 - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) - rooms_to_work_on = txn.fetchall() - - if not rooms_to_work_on: - return None - - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - progress["remaining"] = txn.fetchone()[0] - - return rooms_to_work_on + SELECT DISTINCT room_id FROM current_state_events + WHERE room_id > ? + ORDER BY room_id ASC + LIMIT ? + """ + txn.execute(sql, (last_room_id, batch_size)) + return [r for r, in txn] rooms_to_work_on = yield self.runInteraction( - "populate_stats_temp_read", _get_next_batch + "populate_stats_rooms_get_batch", _get_next_batch ) # No more rooms -- complete the transaction. @@ -188,154 +169,28 @@ class StatsStore(StateDeltasStore): yield self._end_background_update("populate_stats_process_rooms") return 1 - logger.info( - "Processing the next %d rooms of %d remaining", - len(rooms_to_work_on), - progress["remaining"], - ) - - # Number of state events we've processed by going through each room - processed_event_count = 0 - - for room_id, event_count in rooms_to_work_on: - - current_state_ids = yield self.get_current_state_ids(room_id) - - join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) - history_visibility_id = current_state_ids.get( - (EventTypes.RoomHistoryVisibility, "") - ) - encryption_id = current_state_ids.get((EventTypes.RoomEncryption, "")) - name_id = current_state_ids.get((EventTypes.Name, "")) - topic_id = current_state_ids.get((EventTypes.Topic, "")) - avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) - canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) - - event_ids = [ - join_rules_id, - history_visibility_id, - encryption_id, - name_id, - topic_id, - avatar_id, - canonical_alias_id, - ] - - state_events = yield self.get_events( - [ev for ev in event_ids if ev is not None] - ) - - def _get_or_none(event_id, arg): - event = state_events.get(event_id) - if event: - return event.content.get(arg) - return None - - yield self.update_room_state( - room_id, - { - "join_rules": _get_or_none(join_rules_id, "join_rule"), - "history_visibility": _get_or_none( - history_visibility_id, "history_visibility" - ), - "encryption": _get_or_none(encryption_id, "algorithm"), - "name": _get_or_none(name_id, "name"), - "topic": _get_or_none(topic_id, "topic"), - "avatar": _get_or_none(avatar_id, "url"), - "canonical_alias": _get_or_none(canonical_alias_id, "alias"), - }, - ) + for room_id in rooms_to_work_on: + yield self._calculate_and_set_initial_state_for_room(room_id) + progress["last_room_id"] = room_id - now = self.hs.get_reactor().seconds() - - # quantise time to the nearest bucket - now = (now // self.stats_bucket_size) * self.stats_bucket_size - - def _fetch_data(txn): - - # Get the current token of the room - current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) - - current_state_events = len(current_state_ids) - - membership_counts = self._get_user_counts_in_room_txn(txn, room_id) - - total_state_events = self._get_total_state_event_counts_txn( - txn, room_id - ) - - self._update_stats_txn( - txn, - "room", - room_id, - now, - { - "bucket_size": self.stats_bucket_size, - "current_state_events": current_state_events, - "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get(Membership.INVITE, 0), - "left_members": membership_counts.get(Membership.LEAVE, 0), - "banned_members": membership_counts.get(Membership.BAN, 0), - "state_events": total_state_events, - }, - ) - self._simple_insert_txn( - txn, - "room_stats_earliest_token", - {"room_id": room_id, "token": current_token}, - ) - - # We've finished a room. Delete it from the table. - self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id} - ) - - yield self.runInteraction("update_room_stats", _fetch_data) - - # Update the remaining counter. - progress["remaining"] -= 1 - yield self.runInteraction( - "populate_stats", - self._background_update_progress_txn, - "populate_stats_process_rooms", - progress, - ) - - processed_event_count += event_count - - if processed_event_count > batch_size: - # Don't process any more rooms, we've hit our batch size. - return processed_event_count + yield self.runInteraction( + "_populate_stats_process_rooms", + self._background_update_progress_txn, + "populate_stats_process_rooms", + progress, + ) - return processed_event_count + return len(rooms_to_work_on) - def delete_all_stats(self): + def get_stats_positions(self): """ - Delete all statistics records. + Returns the stats processor positions. """ - - def _delete_all_stats_txn(txn): - txn.execute("DELETE FROM room_state") - txn.execute("DELETE FROM room_stats") - txn.execute("DELETE FROM room_stats_earliest_token") - txn.execute("DELETE FROM user_stats") - - return self.runInteraction("delete_all_stats", _delete_all_stats_txn) - - def get_stats_stream_pos(self): return self._simple_select_one_onecol( - table="stats_stream_pos", + table="stats_incremental_position", keyvalues={}, retcol="stream_id", - desc="stats_stream_pos", - ) - - def update_stats_stream_pos(self, stream_id): - return self._simple_update_one( - table="stats_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_stats_stream_pos", + desc="stats_incremental_position", ) def update_room_state(self, room_id, fields): @@ -361,42 +216,87 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", ) - def get_deltas_for_room(self, room_id, start, size=100): + def get_statistics_for_subject(self, stats_type, stats_id, start, size=100): """ - Get statistics deltas for a given room. + Get statistics for a given subject. Args: - room_id (str) + stats_type (str): The type of subject + stats_id (str): The ID of the subject (e.g. room_id or user_id) start (int): Pagination start. Number of entries, not timestamp. size (int): How many entries to return. Returns: Deferred[list[dict]], where the dict has the keys of - ABSOLUTE_STATS_FIELDS["room"] and "ts". + ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". """ - return self._simple_select_list_paginate( - "room_stats", - {"room_id": room_id}, - "ts", + return self.runInteraction( + "get_statistics_for_subject", + self._get_statistics_for_subject_txn, + stats_type, + stats_id, + start, + size, + ) + + def _get_statistics_for_subject_txn( + self, txn, stats_type, stats_id, start, size=100 + ): + """ + Transaction-bound version of L{get_statistics_for_subject}. + """ + + table, id_col = TYPE_TO_TABLE[stats_type] + selected_columns = list( + ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] + ) + + slice_list = self._simple_select_list_paginate_txn( + txn, + table + "_historical", + {id_col: stats_id}, + "end_ts", start, size, - retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), + retcols=selected_columns + ["bucket_size", "end_ts"], order_direction="DESC", ) - def get_all_room_state(self): - return self._simple_select_list( - "room_state", None, retcols=("name", "topic", "canonical_alias") + return slice_list + + def get_room_stats_state(self, room_id): + """ + Returns the current room_stats_state for a room. + + Args: + room_id (str): The ID of the room to return state for. + + Returns (dict): + Dictionary containing these keys: + "name", "topic", "canonical_alias", "avatar", "join_rules", + "history_visibility" + """ + return self._simple_select_one( + "room_stats_state", + {"room_id": room_id}, + retcols=( + "name", + "topic", + "canonical_alias", + "avatar", + "join_rules", + "history_visibility", + ), ) @cached() - def get_earliest_token_for_room_stats(self, room_id): + def get_earliest_token_for_stats(self, stats_type, id): """ Fetch the "earliest token". This is used by the room stats delta processor to ignore deltas that have been processed between the @@ -406,79 +306,571 @@ class StatsStore(StateDeltasStore): Returns: Deferred[int] """ + table, id_col = TYPE_TO_TABLE[stats_type] + return self._simple_select_one_onecol( - "room_stats_earliest_token", - {"room_id": room_id}, - retcol="token", + "%s_current" % (table,), + keyvalues={id_col: id}, + retcol="completed_delta_stream_id", allow_none=True, ) - def update_stats(self, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert( - table=table, - keyvalues={id_col: stats_id, "ts": ts}, - values=fields, - desc="update_stats", + def bulk_update_stats_delta(self, ts, updates, stream_id): + """Bulk update stats tables for a given stream_id and updates the stats + incremental position. + + Args: + ts (int): Current timestamp in ms + updates(dict[str, dict[str, dict[str, Counter]]]): The updates to + commit as a mapping stats_type -> stats_id -> field -> delta. + stream_id (int): Current position. + + Returns: + Deferred + """ + + def _bulk_update_stats_delta_txn(txn): + for stats_type, stats_updates in updates.items(): + for stats_id, fields in stats_updates.items(): + self._update_stats_delta_txn( + txn, + ts=ts, + stats_type=stats_type, + stats_id=stats_id, + fields=fields, + complete_with_stream_id=stream_id, + ) + + self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + ) + + return self.runInteraction( + "bulk_update_stats_delta", _bulk_update_stats_delta_txn ) - def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert_txn( - txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields + def update_stats_delta( + self, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id, + absolute_field_overrides=None, + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + absolute_field_overrides (dict[str, int]): Current stats values + (i.e. not deltas) of absolute fields. + Does not work with per-slice fields. + """ + + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + absolute_field_overrides=absolute_field_overrides, ) - def update_stats_delta(self, ts, stats_type, stats_id, field, value): - def _update_stats_delta(txn): - table, id_col = TYPE_TO_ROOM[stats_type] - - sql = ( - "SELECT * FROM %s" - " WHERE %s=? and ts=(" - " SELECT MAX(ts) FROM %s" - " WHERE %s=?" - ")" - ) % (table, id_col, table, id_col) - txn.execute(sql, (stats_id, stats_id)) - rows = self.cursor_to_dict(txn) - if len(rows) == 0: - # silently skip as we don't have anything to apply a delta to yet. - # this tries to minimise any race between the initial sync and - # subsequent deltas arriving. - return - - current_ts = ts - latest_ts = rows[0]["ts"] - if current_ts < latest_ts: - # This one is in the past, but we're just encountering it now. - # Mark it as part of the current bucket. - current_ts = latest_ts - elif ts != latest_ts: - # we have to copy our absolute counters over to the new entry. - values = { - key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] - } - values[id_col] = stats_id - values["ts"] = ts - values["bucket_size"] = self.stats_bucket_size - - self._simple_insert_txn(txn, table=table, values=values) - - # actually update the new value - if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: - self._simple_update_txn( - txn, - table=table, - keyvalues={id_col: stats_id, "ts": current_ts}, - updatevalues={field: value}, + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id, + absolute_field_overrides=None, + ): + if absolute_field_overrides is None: + absolute_field_overrides = {} + + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + # Lets be paranoid and check that all the given field names are known + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_field_overrides.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) ) + + # Per slice fields do not get added to the _current table + + # This calculates the deltas (`field = field + ?` values) + # for absolute fields, + # * defaulting to 0 if not specified + # (required for the INSERT part of upserting to work) + # * omitting overrides specified in `absolute_field_overrides` + deltas_of_absolute_fields = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_field_overrides + } + + # Keep the delta stream ID field up to date + absolute_field_overrides = absolute_field_overrides.copy() + absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + + # first upsert the `_current` table + self._upsert_with_additive_relatives_txn( + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_field_overrides, + additive_relatives=deltas_of_absolute_fields, + ) + + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={"end_ts": end_ts}, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """Used to update values in the stats tables. + + This is basically a slightly convoluted upsert that *adds* to any + existing rows. + + Args: + txn + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "key_columns": ", ".join(keyvalues), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + self.database_engine.lock_table(txn, table) + retcols = list(chain(absolutes.keys(), additive_relatives.keys())) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + current_row.update(absolutes) + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + extra_dst_insvalues, + additive_relatives, + src_table, + copy_columns, + ): + """Updates the historic stats table with latest updates. + + This involves copying "absolute" fields from the `_current` table, and + adding relative fields to any existing values. + + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. + extra_dst_insvalues (dict[str, any]): Additional values to insert + on new row creation for `into_table`. + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + """ + if self.database_engine.can_native_upsert: + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives, + extra_dst_keyvalues, + extra_dst_insvalues, + ) + sel_exprs = chain( + keyvalues, + copy_columns, + ( + "?" + for _ in chain( + additive_relatives, extra_dst_keyvalues, extra_dst_insvalues + ) + ), + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) + for f in additive_relatives + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": " AND ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), + "sets": ", ".join(chain(sets_cc, sets_ar)), + } + + qargs = list( + chain( + additive_relatives.values(), + extra_dst_keyvalues.values(), + extra_dst_insvalues.values(), + keyvalues.values(), + ) + ) + txn.execute(sql, qargs) + else: + self.database_engine.lock_table(txn, into_table) + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues=all_dest_keyvalues, + retcols=list(chain(additive_relatives.keys(), copy_columns)), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = { + **keyvalues, + **extra_dst_keyvalues, + **extra_dst_insvalues, + **src_row, + **additive_relatives, + } + self._simple_insert_txn(txn, into_table, merged_dict) else: - sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( - table, - field, - field, - id_col, + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row) + + def get_changes_room_total_events_and_bytes(self, min_pos, max_pos): + """Fetches the counts of events in the given range of stream IDs. + + Args: + min_pos (int) + max_pos (int) + + Returns: + Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field + changes. + """ + + return self.runInteraction( + "stats_incremental_total_events_and_bytes", + self.get_changes_room_total_events_and_bytes_txn, + min_pos, + max_pos, + ) + + def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos): + """Gets the total_events and total_event_bytes counts for rooms and + senders, in a range of stream_orderings (including backfilled events). + + Args: + txn + low_pos (int): Low stream ordering + high_pos (int): High stream ordering + + Returns: + tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The + room and user deltas for total_events/total_event_bytes in the + format of `stats_id` -> fields + """ + + if low_pos >= high_pos: + # nothing to do here. + return {}, {} + + if isinstance(self.database_engine, PostgresEngine): + new_bytes_expression = "OCTET_LENGTH(json)" + else: + new_bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE (? < stream_ordering AND stream_ordering <= ?) + OR (? <= stream_ordering AND stream_ordering <= ?) + GROUP BY events.room_id + """ % ( + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) + + room_deltas = { + room_id: {"total_events": new_events, "total_event_bytes": new_bytes} + for room_id, new_events, new_bytes in txn + } + + sql = """ + SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE (? < stream_ordering AND stream_ordering <= ?) + OR (? <= stream_ordering AND stream_ordering <= ?) + GROUP BY events.sender + """ % ( + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) + + user_deltas = { + user_id: {"total_events": new_events, "total_event_bytes": new_bytes} + for user_id, new_events, new_bytes in txn + if self.hs.is_mine_id(user_id) + } + + return room_deltas, user_deltas + + @defer.inlineCallbacks + def _calculate_and_set_initial_state_for_room(self, room_id): + """Calculate and insert an entry into room_stats_current. + + Args: + room_id (str) + + Returns: + Deferred[tuple[dict, dict, int]]: A tuple of room state, membership + counts and stream position. + """ + + def _fetch_current_state_stats(txn): + pos = self.get_room_max_stream_ordering() + + rows = self._simple_select_many_txn( + txn, + table="current_state_events", + column="type", + iterable=[ + EventTypes.Create, + EventTypes.JoinRules, + EventTypes.RoomHistoryVisibility, + EventTypes.Encryption, + EventTypes.Name, + EventTypes.Topic, + EventTypes.RoomAvatar, + EventTypes.CanonicalAlias, + ], + keyvalues={"room_id": room_id, "state_key": ""}, + retcols=["event_id"], + ) + + event_ids = [row["event_id"] for row in rows] + + txn.execute( + """ + SELECT membership, count(*) FROM current_state_events + WHERE room_id = ? AND type = 'm.room.member' + GROUP BY membership + """, + (room_id,), + ) + membership_counts = {membership: cnt for membership, cnt in txn} + + txn.execute( + """ + SELECT COALESCE(count(*), 0) FROM current_state_events + WHERE room_id = ? + """, + (room_id,), + ) + + current_state_events_count, = txn.fetchone() + + users_in_room = self.get_users_in_room_txn(txn, room_id) + + return ( + event_ids, + membership_counts, + current_state_events_count, + users_in_room, + pos, + ) + + ( + event_ids, + membership_counts, + current_state_events_count, + users_in_room, + pos, + ) = yield self.runInteraction( + "get_initial_state_for_room", _fetch_current_state_stats + ) + + state_event_map = yield self.get_events(event_ids, get_prev_content=False) + + room_state = { + "join_rules": None, + "history_visibility": None, + "encryption": None, + "name": None, + "topic": None, + "avatar": None, + "canonical_alias": None, + "is_federatable": True, + } + + for event in state_event_map.values(): + if event.type == EventTypes.JoinRules: + room_state["join_rules"] = event.content.get("join_rule") + elif event.type == EventTypes.RoomHistoryVisibility: + room_state["history_visibility"] = event.content.get( + "history_visibility" ) - txn.execute(sql, (value, stats_id, current_ts)) + elif event.type == EventTypes.Encryption: + room_state["encryption"] = event.content.get("algorithm") + elif event.type == EventTypes.Name: + room_state["name"] = event.content.get("name") + elif event.type == EventTypes.Topic: + room_state["topic"] = event.content.get("topic") + elif event.type == EventTypes.RoomAvatar: + room_state["avatar"] = event.content.get("url") + elif event.type == EventTypes.CanonicalAlias: + room_state["canonical_alias"] = event.content.get("alias") + elif event.type == EventTypes.Create: + room_state["is_federatable"] = event.content.get("m.federate", True) + + yield self.update_room_state(room_id, room_state) + + local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)] + + yield self.update_stats_delta( + ts=self.clock.time_msec(), + stats_type="room", + stats_id=room_id, + fields={}, + complete_with_stream_id=pos, + absolute_field_overrides={ + "current_state_events": current_state_events_count, + "joined_members": membership_counts.get(Membership.JOIN, 0), + "invited_members": membership_counts.get(Membership.INVITE, 0), + "left_members": membership_counts.get(Membership.LEAVE, 0), + "banned_members": membership_counts.get(Membership.BAN, 0), + "local_users_in_room": len(local_users_in_room), + }, + ) + + @defer.inlineCallbacks + def _calculate_and_set_initial_state_for_user(self, user_id): + def _calculate_and_set_initial_state_for_user_txn(txn): + pos = self._get_max_stream_id_in_current_state_deltas_txn(txn) - return self.runInteraction("update_stats_delta", _update_stats_delta) + txn.execute( + """ + SELECT COUNT(distinct room_id) FROM current_state_events + WHERE type = 'm.room.member' AND state_key = ? + AND membership = 'join' + """, + (user_id,), + ) + count, = txn.fetchone() + return count, pos + + joined_rooms, pos = yield self.runInteraction( + "calculate_and_set_initial_state_for_user", + _calculate_and_set_initial_state_for_user_txn, + ) + + yield self.update_stats_delta( + ts=self.clock.time_msec(), + stats_type="user", + stats_id=user_id, + fields={}, + complete_with_stream_id=pos, + absolute_field_overrides={"joined_rooms": joined_rooms}, + ) diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index a8b858eb4f..7569b6fab5 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -13,16 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from mock import Mock - -from twisted.internet import defer - -from synapse.api.constants import EventTypes, Membership +from synapse import storage from synapse.rest import admin from synapse.rest.client.v1 import login, room from tests import unittest +# The expected number of state events in a fresh public room. +EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM = 5 +# The expected number of state events in a fresh private room. +EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM = 6 + class StatsRoomTests(unittest.HomeserverTestCase): @@ -33,7 +34,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): ] def prepare(self, reactor, clock, hs): - self.store = hs.get_datastore() self.handler = self.hs.get_stats_handler() @@ -47,7 +47,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.get_success( self.store._simple_insert( "background_updates", - {"update_name": "populate_stats_createtables", "progress_json": "{}"}, + {"update_name": "populate_stats_prepare", "progress_json": "{}"}, ) ) self.get_success( @@ -56,7 +56,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): { "update_name": "populate_stats_process_rooms", "progress_json": "{}", - "depends_on": "populate_stats_createtables", + "depends_on": "populate_stats_prepare", }, ) ) @@ -64,18 +64,58 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.store._simple_insert( "background_updates", { - "update_name": "populate_stats_cleanup", + "update_name": "populate_stats_process_users", "progress_json": "{}", "depends_on": "populate_stats_process_rooms", }, ) ) + self.get_success( + self.store._simple_insert( + "background_updates", + { + "update_name": "populate_stats_cleanup", + "progress_json": "{}", + "depends_on": "populate_stats_process_users", + }, + ) + ) + + def get_all_room_state(self): + return self.store._simple_select_list( + "room_stats_state", None, retcols=("name", "topic", "canonical_alias") + ) + + def _get_current_stats(self, stats_type, stat_id): + table, id_col = storage.stats.TYPE_TO_TABLE[stats_type] + + cols = list(storage.stats.ABSOLUTE_STATS_FIELDS[stats_type]) + list( + storage.stats.PER_SLICE_FIELDS[stats_type] + ) + + end_ts = self.store.quantise_stats_time(self.reactor.seconds() * 1000) + + return self.get_success( + self.store._simple_select_one( + table + "_historical", + {id_col: stat_id, end_ts: end_ts}, + cols, + allow_none=True, + ) + ) + + def _perform_background_initial_update(self): + # Do the initial population of the stats via the background update + self._add_background_updates() + + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) def test_initial_room(self): """ The background updates will build the table from scratch. """ - r = self.get_success(self.store.get_all_room_state()) + r = self.get_success(self.get_all_room_state()) self.assertEqual(len(r), 0) # Disable stats @@ -91,7 +131,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): ) # Stats disabled, shouldn't have done anything - r = self.get_success(self.store.get_all_room_state()) + r = self.get_success(self.get_all_room_state()) self.assertEqual(len(r), 0) # Enable stats @@ -104,7 +144,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): while not self.get_success(self.store.has_completed_background_updates()): self.get_success(self.store.do_next_background_update(100), by=0.1) - r = self.get_success(self.store.get_all_room_state()) + r = self.get_success(self.get_all_room_state()) self.assertEqual(len(r), 1) self.assertEqual(r[0]["topic"], "foo") @@ -114,6 +154,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): Ingestion via notify_new_event will ignore tokens that the background update have already processed. """ + self.reactor.advance(86401) self.hs.config.stats_enabled = False @@ -138,12 +179,18 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.hs.config.stats_enabled = True self.handler.stats_enabled = True self.store._all_done = False - self.get_success(self.store.update_stats_stream_pos(None)) + self.get_success( + self.store._simple_update_one( + table="stats_incremental_position", + keyvalues={}, + updatevalues={"stream_id": 0}, + ) + ) self.get_success( self.store._simple_insert( "background_updates", - {"update_name": "populate_stats_createtables", "progress_json": "{}"}, + {"update_name": "populate_stats_prepare", "progress_json": "{}"}, ) ) @@ -154,6 +201,8 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token) self.helper.join(room=room_1, user=u2, tok=u2_token) + # orig_delta_processor = self.store. + # Now do the initial ingestion. self.get_success( self.store._simple_insert( @@ -185,8 +234,15 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token) self.helper.join(room=room_1, user=u3, tok=u3_token) - # Get the deltas! There should be two -- day 1, and day 2. - r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) + # self.handler.notify_new_event() + + # We need to let the delta processor advance… + self.pump(10 * 60) + + # Get the slices! There should be two -- day 1, and day 2. + r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0)) + + self.assertEqual(len(r), 2) # The oldest has 2 joined members self.assertEqual(r[-1]["joined_members"], 2) @@ -194,111 +250,476 @@ class StatsRoomTests(unittest.HomeserverTestCase): # The newest has 3 self.assertEqual(r[0]["joined_members"], 3) - def test_incorrect_state_transition(self): - """ - If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to - (JOIN, INVITE, LEAVE, BAN), an error is raised. - """ - events = { - "a1": {"membership": Membership.LEAVE}, - "a2": {"membership": "not a real thing"}, - } - - def get_event(event_id, allow_none=True): - m = Mock() - m.content = events[event_id] - d = defer.Deferred() - self.reactor.callLater(0.0, d.callback, m) - return d - - def get_received_ts(event_id): - return defer.succeed(1) - - self.store.get_received_ts = get_received_ts - self.store.get_event = get_event - - deltas = [ - { - "type": EventTypes.Member, - "state_key": "some_user", - "room_id": "room", - "event_id": "a1", - "prev_event_id": "a2", - "stream_id": 60, - } - ] - - f = self.get_failure(self.handler._handle_deltas(deltas), ValueError) + def test_create_user(self): + """ + When we create a user, it should have statistics already ready. + """ + + u1 = self.register_user("u1", "pass") + + u1stats = self._get_current_stats("user", u1) + + self.assertIsNotNone(u1stats) + + # not in any rooms by default + self.assertEqual(u1stats["joined_rooms"], 0) + + def test_create_room(self): + """ + When we create a room, it should have statistics already ready. + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + r1stats = self._get_current_stats("room", r1) + r2 = self.helper.create_room_as(u1, tok=u1token, is_public=False) + r2stats = self._get_current_stats("room", r2) + + self.assertIsNotNone(r1stats) + self.assertIsNotNone(r2stats) + + # contains the default things you'd expect in a fresh room self.assertEqual( - f.value.args[0], "'not a real thing' is not a valid prev_membership" - ) - - # And the other way... - deltas = [ - { - "type": EventTypes.Member, - "state_key": "some_user", - "room_id": "room", - "event_id": "a2", - "prev_event_id": "a1", - "stream_id": 100, - } - ] - - f = self.get_failure(self.handler._handle_deltas(deltas), ValueError) + r1stats["total_events"], + EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM, + "Wrong number of total_events in new room's stats!" + " You may need to update this if more state events are added to" + " the room creation process.", + ) self.assertEqual( - f.value.args[0], "'not a real thing' is not a valid membership" + r2stats["total_events"], + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM, + "Wrong number of total_events in new room's stats!" + " You may need to update this if more state events are added to" + " the room creation process.", ) - def test_redacted_prev_event(self): + self.assertEqual( + r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM + ) + self.assertEqual( + r2stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM + ) + + self.assertEqual(r1stats["joined_members"], 1) + self.assertEqual(r1stats["invited_members"], 0) + self.assertEqual(r1stats["banned_members"], 0) + + self.assertEqual(r2stats["joined_members"], 1) + self.assertEqual(r2stats["invited_members"], 0) + self.assertEqual(r2stats["banned_members"], 0) + + def test_send_message_increments_total_events(self): """ - If the prev_event does not exist, then it is assumed to be a LEAVE. + When we send a message, it increments total_events. """ + + self._perform_background_initial_update() + u1 = self.register_user("u1", "pass") - u1_token = self.login("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + r1stats_ante = self._get_current_stats("room", r1) - room_1 = self.helper.create_room_as(u1, tok=u1_token) + self.helper.send(r1, "hiss", tok=u1token) - # Do the initial population of the user directory via the background update - self._add_background_updates() + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + + def test_send_state_event_nonoverwriting(self): + """ + When we send a non-overwriting state event, it increments total_events AND current_state_events + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + self.helper.send_state( + r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby" + ) + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.send_state( + r1, "cat.hissing", {"value": False}, tok=u1token, state_key="moggy" + ) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 1, + ) + + def test_send_state_event_overwriting(self): + """ + When we send an overwriting state event, it increments total_events ONLY + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + self.helper.send_state( + r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby" + ) + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.send_state( + r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby" + ) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 0, + ) + + def test_join_first_time(self): + """ + When a user joins a room for the first time, total_events, current_state_events and + joined_members should increase by exactly 1. + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + u2 = self.register_user("u2", "pass") + u2token = self.login("u2", "pass") + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.join(r1, u2, tok=u2token) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 1, + ) + self.assertEqual( + r1stats_post["joined_members"] - r1stats_ante["joined_members"], 1 + ) + + def test_join_after_leave(self): + """ + When a user joins a room after being previously left, total_events and + joined_members should increase by exactly 1. + current_state_events should not increase. + left_members should decrease by exactly 1. + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + u2 = self.register_user("u2", "pass") + u2token = self.login("u2", "pass") + + self.helper.join(r1, u2, tok=u2token) + self.helper.leave(r1, u2, tok=u2token) + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.join(r1, u2, tok=u2token) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 0, + ) + self.assertEqual( + r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1 + ) + self.assertEqual( + r1stats_post["left_members"] - r1stats_ante["left_members"], -1 + ) + + def test_invited(self): + """ + When a user invites another user, current_state_events, total_events and + invited_members should increase by exactly 1. + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + u2 = self.register_user("u2", "pass") + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.invite(r1, u1, u2, tok=u1token) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 1, + ) + self.assertEqual( + r1stats_post["invited_members"] - r1stats_ante["invited_members"], +1 + ) + + def test_join_after_invite(self): + """ + When a user joins a room after being invited, total_events and + joined_members should increase by exactly 1. + current_state_events should not increase. + invited_members should decrease by exactly 1. + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + u2 = self.register_user("u2", "pass") + u2token = self.login("u2", "pass") + + self.helper.invite(r1, u1, u2, tok=u1token) + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.join(r1, u2, tok=u2token) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 0, + ) + self.assertEqual( + r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1 + ) + self.assertEqual( + r1stats_post["invited_members"] - r1stats_ante["invited_members"], -1 + ) + + def test_left(self): + """ + When a user leaves a room after joining, total_events and + left_members should increase by exactly 1. + current_state_events should not increase. + joined_members should decrease by exactly 1. + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + u2 = self.register_user("u2", "pass") + u2token = self.login("u2", "pass") + + self.helper.join(r1, u2, tok=u2token) + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.leave(r1, u2, tok=u2token) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 0, + ) + self.assertEqual( + r1stats_post["left_members"] - r1stats_ante["left_members"], +1 + ) + self.assertEqual( + r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1 + ) + + def test_banned(self): + """ + When a user is banned from a room after joining, total_events and + left_members should increase by exactly 1. + current_state_events should not increase. + banned_members should decrease by exactly 1. + """ + + self._perform_background_initial_update() + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + u2 = self.register_user("u2", "pass") + u2token = self.login("u2", "pass") + + self.helper.join(r1, u2, tok=u2token) + + r1stats_ante = self._get_current_stats("room", r1) + + self.helper.change_membership(r1, u1, u2, "ban", tok=u1token) + + r1stats_post = self._get_current_stats("room", r1) + + self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) + self.assertEqual( + r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], + 0, + ) + self.assertEqual( + r1stats_post["banned_members"] - r1stats_ante["banned_members"], +1 + ) + self.assertEqual( + r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1 + ) + + def test_initial_background_update(self): + """ + Test that statistics can be generated by the initial background update + handler. + + This test also tests that stats rows are not created for new subjects + when stats are disabled. However, it may be desirable to change this + behaviour eventually to still keep current rows. + """ + + self.hs.config.stats_enabled = False + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token) + + # test that these subjects, which were created during a time of disabled + # stats, do not have stats. + self.assertIsNone(self._get_current_stats("room", r1)) + self.assertIsNone(self._get_current_stats("user", u1)) + + self.hs.config.stats_enabled = True + + self._perform_background_initial_update() + + r1stats = self._get_current_stats("room", r1) + u1stats = self._get_current_stats("user", u1) + + self.assertEqual(r1stats["joined_members"], 1) + self.assertEqual( + r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM + ) + + self.assertEqual(u1stats["joined_rooms"], 1) + + def test_incomplete_stats(self): + """ + This tests that we track incomplete statistics. + + We first test that incomplete stats are incrementally generated, + following the preparation of a background regen. + + We then test that these incomplete rows are completed by the background + regen. + """ + + u1 = self.register_user("u1", "pass") + u1token = self.login("u1", "pass") + u2 = self.register_user("u2", "pass") + u2token = self.login("u2", "pass") + u3 = self.register_user("u3", "pass") + r1 = self.helper.create_room_as(u1, tok=u1token, is_public=False) + + # preparation stage of the initial background update + # Ugh, have to reset this flag + self.store._all_done = False + + self.get_success( + self.store._simple_delete( + "room_stats_current", {"1": 1}, "test_delete_stats" + ) + ) + self.get_success( + self.store._simple_delete( + "user_stats_current", {"1": 1}, "test_delete_stats" + ) + ) + + self.helper.invite(r1, u1, u2, tok=u1token) + self.helper.join(r1, u2, tok=u2token) + self.helper.invite(r1, u1, u3, tok=u1token) + self.helper.send(r1, "thou shalt yield", tok=u1token) + + # now do the background updates + + self.store._all_done = False + self.get_success( + self.store._simple_insert( + "background_updates", + { + "update_name": "populate_stats_process_rooms", + "progress_json": "{}", + "depends_on": "populate_stats_prepare", + }, + ) + ) + self.get_success( + self.store._simple_insert( + "background_updates", + { + "update_name": "populate_stats_process_users", + "progress_json": "{}", + "depends_on": "populate_stats_process_rooms", + }, + ) + ) + self.get_success( + self.store._simple_insert( + "background_updates", + { + "update_name": "populate_stats_cleanup", + "progress_json": "{}", + "depends_on": "populate_stats_process_users", + }, + ) + ) while not self.get_success(self.store.has_completed_background_updates()): self.get_success(self.store.do_next_background_update(100), by=0.1) - events = {"a1": None, "a2": {"membership": Membership.JOIN}} - - def get_event(event_id, allow_none=True): - if events.get(event_id): - m = Mock() - m.content = events[event_id] - else: - m = None - d = defer.Deferred() - self.reactor.callLater(0.0, d.callback, m) - return d - - def get_received_ts(event_id): - return defer.succeed(1) - - self.store.get_received_ts = get_received_ts - self.store.get_event = get_event - - deltas = [ - { - "type": EventTypes.Member, - "state_key": "some_user:test", - "room_id": room_1, - "event_id": "a2", - "prev_event_id": "a1", - "stream_id": 100, - } - ] - - # Handle our fake deltas, which has a user going from LEAVE -> JOIN. - self.get_success(self.handler._handle_deltas(deltas)) - - # One delta, with two joined members -- the room creator, and our fake - # user. - r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) - self.assertEqual(len(r), 1) - self.assertEqual(r[0]["joined_members"], 2) + r1stats_complete = self._get_current_stats("room", r1) + u1stats_complete = self._get_current_stats("user", u1) + u2stats_complete = self._get_current_stats("user", u2) + + # now we make our assertions + + # check that _complete rows are complete and correct + self.assertEqual(r1stats_complete["joined_members"], 2) + self.assertEqual(r1stats_complete["invited_members"], 1) + + self.assertEqual( + r1stats_complete["current_state_events"], + 2 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM, + ) + + self.assertEqual(u1stats_complete["joined_rooms"], 1) + self.assertEqual(u2stats_complete["joined_rooms"], 1) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 9915367144..cdded88b7f 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -128,8 +128,12 @@ class RestHelper(object): return channel.json_body - def send_state(self, room_id, event_type, body, tok, expect_code=200): - path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type) + def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""): + path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % ( + room_id, + event_type, + state_key, + ) if tok: path = path + "?access_token=%s" % tok -- cgit 1.4.1 From 1e19ce00bff8d67168d39201cdf9424f7b2f22f6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 17 Sep 2019 11:41:54 +0100 Subject: Add 'failure_ts' column to 'destinations' table (#6016) Track the time that a server started failing at, for general analysis purposes. --- changelog.d/6016.misc | 1 + .../schema/delta/56/destinations_failure_ts.sql | 25 ++++ synapse/storage/transactions.py | 23 ++-- synapse/util/retryutils.py | 16 ++- tests/handlers/test_typing.py | 7 +- tests/storage/test_transactions.py | 8 +- tests/util/test_retryutils.py | 127 +++++++++++++++++++++ 7 files changed, 195 insertions(+), 12 deletions(-) create mode 100644 changelog.d/6016.misc create mode 100644 synapse/storage/schema/delta/56/destinations_failure_ts.sql create mode 100644 tests/util/test_retryutils.py (limited to 'synapse/storage/schema') diff --git a/changelog.d/6016.misc b/changelog.d/6016.misc new file mode 100644 index 0000000000..91cf164714 --- /dev/null +++ b/changelog.d/6016.misc @@ -0,0 +1 @@ +Add a 'failure_ts' column to the 'destinations' database table. diff --git a/synapse/storage/schema/delta/56/destinations_failure_ts.sql b/synapse/storage/schema/delta/56/destinations_failure_ts.sql new file mode 100644 index 0000000000..f00889290b --- /dev/null +++ b/synapse/storage/schema/delta/56/destinations_failure_ts.sql @@ -0,0 +1,25 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Record the timestamp when a given server started failing + */ +ALTER TABLE destinations ADD failure_ts BIGINT; + +/* as a rough approximation, we assume that the server started failing at + * retry_interval before the last retry + */ +UPDATE destinations SET failure_ts = retry_last_ts - retry_interval + WHERE retry_last_ts > 0; diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d81ace0ece..289c117396 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -165,7 +165,7 @@ class TransactionStore(SQLBaseStore): txn, table="destinations", keyvalues={"destination": destination}, - retcols=("destination", "retry_last_ts", "retry_interval"), + retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), allow_none=True, ) @@ -174,12 +174,15 @@ class TransactionStore(SQLBaseStore): else: return None - def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): + def set_destination_retry_timings( + self, destination, failure_ts, retry_last_ts, retry_interval + ): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. Args: destination (str) + failure_ts (int|None) - when the server started failing (ms since epoch) retry_last_ts (int) - time of last retry attempt in unix epoch ms retry_interval (int) - how long until next retry in ms """ @@ -189,12 +192,13 @@ class TransactionStore(SQLBaseStore): "set_destination_retry_timings", self._set_destination_retry_timings, destination, + failure_ts, retry_last_ts, retry_interval, ) def _set_destination_retry_timings( - self, txn, destination, retry_last_ts, retry_interval + self, txn, destination, failure_ts, retry_last_ts, retry_interval ): if self.database_engine.can_native_upsert: @@ -202,9 +206,12 @@ class TransactionStore(SQLBaseStore): # resetting it) or greater than the existing retry interval. sql = """ - INSERT INTO destinations (destination, retry_last_ts, retry_interval) - VALUES (?, ?, ?) + INSERT INTO destinations ( + destination, failure_ts, retry_last_ts, retry_interval + ) + VALUES (?, ?, ?, ?) ON CONFLICT (destination) DO UPDATE SET + failure_ts = EXCLUDED.failure_ts, retry_last_ts = EXCLUDED.retry_last_ts, retry_interval = EXCLUDED.retry_interval WHERE @@ -212,7 +219,7 @@ class TransactionStore(SQLBaseStore): OR destinations.retry_interval < EXCLUDED.retry_interval """ - txn.execute(sql, (destination, retry_last_ts, retry_interval)) + txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) return @@ -225,7 +232,7 @@ class TransactionStore(SQLBaseStore): txn, table="destinations", keyvalues={"destination": destination}, - retcols=("retry_last_ts", "retry_interval"), + retcols=("failure_ts", "retry_last_ts", "retry_interval"), allow_none=True, ) @@ -235,6 +242,7 @@ class TransactionStore(SQLBaseStore): table="destinations", values={ "destination": destination, + "failure_ts": failure_ts, "retry_last_ts": retry_last_ts, "retry_interval": retry_interval, }, @@ -245,6 +253,7 @@ class TransactionStore(SQLBaseStore): "destinations", keyvalues={"destination": destination}, updatevalues={ + "failure_ts": failure_ts, "retry_last_ts": retry_last_ts, "retry_interval": retry_interval, }, diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index b740913b58..a5f2fbef5c 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -80,11 +80,13 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs) # We aren't ready to retry that destination. raise """ + failure_ts = None retry_last_ts, retry_interval = (0, 0) retry_timings = yield store.get_destination_retry_timings(destination) if retry_timings: + failure_ts = retry_timings["failure_ts"] retry_last_ts, retry_interval = ( retry_timings["retry_last_ts"], retry_timings["retry_interval"], @@ -108,6 +110,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs) destination, clock, store, + failure_ts, retry_interval, backoff_on_failure=backoff_on_failure, **kwargs @@ -120,6 +123,7 @@ class RetryDestinationLimiter(object): destination, clock, store, + failure_ts, retry_interval, backoff_on_404=False, backoff_on_failure=True, @@ -133,6 +137,8 @@ class RetryDestinationLimiter(object): destination (str) clock (Clock) store (DataStore) + failure_ts (int|None): when this destination started failing (in ms since + the epoch), or zero if the last request was successful retry_interval (int): The next retry interval taken from the database in milliseconds, or zero if the last request was successful. @@ -145,6 +151,7 @@ class RetryDestinationLimiter(object): self.store = store self.destination = destination + self.failure_ts = failure_ts self.retry_interval = retry_interval self.backoff_on_404 = backoff_on_404 self.backoff_on_failure = backoff_on_failure @@ -186,6 +193,7 @@ class RetryDestinationLimiter(object): logger.debug( "Connection to %s was successful; clearing backoff", self.destination ) + self.failure_ts = None retry_last_ts = 0 self.retry_interval = 0 elif not self.backoff_on_failure: @@ -211,11 +219,17 @@ class RetryDestinationLimiter(object): ) retry_last_ts = int(self.clock.time_msec()) + if self.failure_ts is None: + self.failure_ts = retry_last_ts + @defer.inlineCallbacks def store_retry_timings(): try: yield self.store.set_destination_retry_timings( - self.destination, retry_last_ts, self.retry_interval + self.destination, + self.failure_ts, + retry_last_ts, + self.retry_interval, ) except Exception: logger.exception("Failed to store destination_retry_timings") diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 5d5e324df2..1f2ef5d01f 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -99,7 +99,12 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.event_source = hs.get_event_sources().sources["typing"] self.datastore = hs.get_datastore() - retry_timings_res = {"destination": "", "retry_last_ts": 0, "retry_interval": 0} + retry_timings_res = { + "destination": "", + "retry_last_ts": 0, + "retry_interval": 0, + "failure_ts": None, + } self.datastore.get_destination_retry_timings.return_value = defer.succeed( retry_timings_res ) diff --git a/tests/storage/test_transactions.py b/tests/storage/test_transactions.py index 14169afa96..a771d5af29 100644 --- a/tests/storage/test_transactions.py +++ b/tests/storage/test_transactions.py @@ -29,17 +29,19 @@ class TransactionStoreTestCase(HomeserverTestCase): r = self.get_success(d) self.assertIsNone(r) - d = self.store.set_destination_retry_timings("example.com", 50, 100) + d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100) self.get_success(d) d = self.store.get_destination_retry_timings("example.com") r = self.get_success(d) - self.assert_dict({"retry_last_ts": 50, "retry_interval": 100}, r) + self.assert_dict( + {"retry_last_ts": 50, "retry_interval": 100, "failure_ts": 1000}, r + ) def test_initial_set_transactions(self): """Tests that we can successfully set the destination retries (there was a bug around invalidating the cache that broke this) """ - d = self.store.set_destination_retry_timings("example.com", 50, 100) + d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100) self.get_success(d) diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py new file mode 100644 index 0000000000..9e348694ad --- /dev/null +++ b/tests/util/test_retryutils.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.util.retryutils import ( + MIN_RETRY_INTERVAL, + RETRY_MULTIPLIER, + NotRetryingDestination, + get_retry_limiter, +) + +from tests.unittest import HomeserverTestCase + + +class RetryLimiterTestCase(HomeserverTestCase): + def test_new_destination(self): + """A happy-path case with a new destination and a successful operation""" + store = self.hs.get_datastore() + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + # advance the clock a bit before making the request + self.pump(1) + + with limiter: + pass + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertIsNone(new_timings) + + def test_limiter(self): + """General test case which walks through the process of a failing request""" + store = self.hs.get_datastore() + + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + self.pump(1) + try: + with limiter: + self.pump(1) + failure_ts = self.clock.time_msec() + raise AssertionError("argh") + except AssertionError: + pass + + # wait for the update to land + self.pump() + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertEqual(new_timings["failure_ts"], failure_ts) + self.assertEqual(new_timings["retry_last_ts"], failure_ts) + self.assertEqual(new_timings["retry_interval"], MIN_RETRY_INTERVAL) + + # now if we try again we should get a failure + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + self.failureResultOf(d, NotRetryingDestination) + + # + # advance the clock and try again + # + + self.pump(MIN_RETRY_INTERVAL) + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + self.pump(1) + try: + with limiter: + self.pump(1) + retry_ts = self.clock.time_msec() + raise AssertionError("argh") + except AssertionError: + pass + + # wait for the update to land + self.pump() + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertEqual(new_timings["failure_ts"], failure_ts) + self.assertEqual(new_timings["retry_last_ts"], retry_ts) + self.assertGreaterEqual( + new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5 + ) + self.assertLessEqual( + new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0 + ) + + # + # one more go, with success + # + self.pump(MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0) + d = get_retry_limiter("test_dest", self.clock, store) + self.pump() + limiter = self.successResultOf(d) + + self.pump(1) + with limiter: + self.pump(1) + + # wait for the update to land + self.pump() + + d = store.get_destination_retry_timings("test_dest") + self.pump() + new_timings = self.successResultOf(d) + self.assertIsNone(new_timings) -- cgit 1.4.1