diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/crypto/event_signing.py | 6 | ||||
-rw-r--r-- | synapse/events/snapshot.py | 100 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 2 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 4 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/sync.py | 11 | ||||
-rw-r--r-- | synapse/rest/media/v1/preview_url_resource.py | 14 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 9 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/e2e_room_keys.py | 8 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/event_federation.py | 4 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/roommember.py | 2 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/schema/delta/56/delete_keys_from_deleted_backups.sql | 25 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/state.py | 7 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/stats.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 9 |
15 files changed, 115 insertions, 90 deletions
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 694fb2c816..ccaa8a9920 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -125,9 +125,11 @@ def compute_event_signature(event_dict, signature_name, signing_key): redact_json = prune_event_dict(event_dict) redact_json.pop("age_ts", None) redact_json.pop("unsigned", None) - logger.debug("Signing event: %s", encode_canonical_json(redact_json)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Signing event: %s", encode_canonical_json(redact_json)) redact_json = sign_json(redact_json, signature_name, signing_key) - logger.debug("Signed event: %s", encode_canonical_json(redact_json)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Signed event: %s", encode_canonical_json(redact_json)) return redact_json["signatures"] diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index acbcbeeced..27cd8a63ff 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from six import iteritems +import attr from frozendict import frozendict from twisted.internet import defer @@ -22,7 +22,8 @@ from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background -class EventContext(object): +@attr.s(slots=True) +class EventContext: """ Attributes: state_group (int|None): state group id, if the state has been stored @@ -31,9 +32,6 @@ class EventContext(object): rejected (bool|str): A rejection reason if the event was rejected, else False - push_actions (list[(str, list[object])]): list of (user_id, actions) - tuples - prev_group (int): Previously persisted state group. ``None`` for an outlier. delta_ids (dict[(str, str), str]): Delta from ``prev_group``. @@ -42,6 +40,8 @@ class EventContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? + app_service: FIXME + _current_state_ids (dict[(str, str), str]|None): The current state map including the current event. None if outlier or we haven't fetched the state from DB yet. @@ -67,49 +67,33 @@ class EventContext(object): Only set when state has not been fetched yet. """ - __slots__ = [ - "state_group", - "rejected", - "prev_group", - "delta_ids", - "prev_state_events", - "app_service", - "_current_state_ids", - "_prev_state_ids", - "_prev_state_id", - "_event_type", - "_event_state_key", - "_fetching_state_deferred", - ] - - def __init__(self): - self.prev_state_events = [] - self.rejected = False - self.app_service = None + state_group = attr.ib(default=None) + rejected = attr.ib(default=False) + prev_group = attr.ib(default=None) + delta_ids = attr.ib(default=None) + prev_state_events = attr.ib(default=attr.Factory(list)) + app_service = attr.ib(default=None) + + _current_state_ids = attr.ib(default=None) + _prev_state_ids = attr.ib(default=None) + _prev_state_id = attr.ib(default=None) + + _event_type = attr.ib(default=None) + _event_state_key = attr.ib(default=None) + _fetching_state_deferred = attr.ib(default=None) @staticmethod def with_state( state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None ): - context = EventContext() - - # The current state including the current event - context._current_state_ids = current_state_ids - # The current state excluding the current event - context._prev_state_ids = prev_state_ids - context.state_group = state_group - - context._prev_state_id = None - context._event_type = None - context._event_state_key = None - context._fetching_state_deferred = defer.succeed(None) - - # A previously persisted state group and a delta between that - # and this state. - context.prev_group = prev_group - context.delta_ids = delta_ids - - return context + return EventContext( + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + state_group=state_group, + fetching_state_deferred=defer.succeed(None), + prev_group=prev_group, + delta_ids=delta_ids, + ) @defer.inlineCallbacks def serialize(self, event, store): @@ -157,24 +141,18 @@ class EventContext(object): Returns: EventContext """ - context = EventContext() - - # We use the state_group and prev_state_id stuff to pull the - # current_state_ids out of the DB and construct prev_state_ids. - context._prev_state_id = input["prev_state_id"] - context._event_type = input["event_type"] - context._event_state_key = input["event_state_key"] - - context._current_state_ids = None - context._prev_state_ids = None - context._fetching_state_deferred = None - - context.state_group = input["state_group"] - context.prev_group = input["prev_group"] - context.delta_ids = _decode_state_dict(input["delta_ids"]) - - context.rejected = input["rejected"] - context.prev_state_events = input["prev_state_events"] + context = EventContext( + # We use the state_group and prev_state_id stuff to pull the + # current_state_ids out of the DB and construct prev_state_ids. + prev_state_id=input["prev_state_id"], + event_type=input["event_type"], + event_state_key=input["event_state_key"], + state_group=input["state_group"], + prev_group=input["prev_group"], + delta_ids=_decode_state_dict(input["delta_ids"]), + rejected=input["rejected"], + prev_state_events=input["prev_state_events"], + ) app_service_id = input["app_service_id"] if app_service_id: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5b22a39b7f..f5c1632916 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -196,7 +196,7 @@ class FederationClient(FederationBase): dest, room_id, extremities, limit ) - logger.debug("backfill transaction_data=%s", repr(transaction_data)) + logger.debug("backfill transaction_data=%r", transaction_data) room_version = yield self.store.get_room_version(room_id) format_ver = room_version_to_event_format(room_version) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 7b18408144..920fa86853 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -122,10 +122,10 @@ class TransportLayerClient(object): Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s", + "backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s", destination, room_id, - repr(event_tuples), + event_tuples, str(limit), ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 22491f3700..2bbdd11941 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -79,7 +79,7 @@ class BulkPushRuleEvaluator(object): dict of user_id -> push_rules """ room_id = event.room_id - rules_for_room = self._get_rules_for_room(room_id) + rules_for_room = yield self._get_rules_for_room(room_id) rules_by_user = yield rules_for_room.get_rules(event, context) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a883c8adda..541a6b0e10 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -112,9 +112,14 @@ class SyncRestServlet(RestServlet): full_state = parse_boolean(request, "full_state", default=False) logger.debug( - "/sync: user=%r, timeout=%r, since=%r," - " set_presence=%r, filter_id=%r, device_id=%r" - % (user, timeout, since, set_presence, filter_id, device_id) + "/sync: user=%r, timeout=%r, since=%r, " + "set_presence=%r, filter_id=%r, device_id=%r", + user, + timeout, + since, + set_presence, + filter_id, + device_id, ) request_key = (user, timeout, since, filter_id, full_state, device_id) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 0c68c3aad5..094ebad770 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -117,8 +117,10 @@ class PreviewUrlResource(DirectServeResource): pattern = entry[attrib] value = getattr(url_tuple, attrib) logger.debug( - ("Matching attrib '%s' with value '%s' against" " pattern '%s'") - % (attrib, value, pattern) + "Matching attrib '%s' with value '%s' against" " pattern '%s'", + attrib, + value, + pattern, ) if value is None: @@ -186,7 +188,7 @@ class PreviewUrlResource(DirectServeResource): media_info = yield self._download_url(url, user) - logger.debug("got media_info of '%s'" % media_info) + logger.debug("got media_info of '%s'", media_info) if _is_media(media_info["media_type"]): file_id = media_info["filesystem_id"] @@ -254,7 +256,7 @@ class PreviewUrlResource(DirectServeResource): og["og:image:width"] = dims["width"] og["og:image:height"] = dims["height"] else: - logger.warn("Couldn't get dims for %s" % og["og:image"]) + logger.warn("Couldn't get dims for %s", og["og:image"]) og["og:image"] = "mxc://%s/%s" % ( self.server_name, @@ -268,7 +270,7 @@ class PreviewUrlResource(DirectServeResource): logger.warn("Failed to find any OG data in %s", url) og = {} - logger.debug("Calculated OG for %s as %s" % (url, og)) + logger.debug("Calculated OG for %s as %s", url, og) jsonog = json.dumps(og) @@ -297,7 +299,7 @@ class PreviewUrlResource(DirectServeResource): with self.media_storage.store_into_file(file_info) as (f, fname, finish): try: - logger.debug("Trying to get url '%s'" % url) + logger.debug("Trying to get url '%s'", url) length, headers, uri, code = yield self.client.get_file( url, output_stream=f, max_size=self.max_spider_size ) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 80b57a948c..37d469ffd7 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -94,13 +94,16 @@ class BackgroundUpdateStore(SQLBaseStore): self._all_done = False def start_doing_background_updates(self): - run_as_background_process("background_updates", self._run_background_updates) + run_as_background_process("background_updates", self.run_background_updates) @defer.inlineCallbacks - def _run_background_updates(self): + def run_background_updates(self, sleep=True): logger.info("Starting background schema updates") while True: - yield self.hs.get_clock().sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + if sleep: + yield self.hs.get_clock().sleep( + self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0 + ) try: result = yield self.do_next_background_update( diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py index ef88e79293..1cbbae5b63 100644 --- a/synapse/storage/data_stores/main/e2e_room_keys.py +++ b/synapse/storage/data_stores/main/e2e_room_keys.py @@ -321,9 +321,17 @@ class EndToEndRoomKeyStore(SQLBaseStore): def _delete_e2e_room_keys_version_txn(txn): if version is None: this_version = self._get_current_version(txn, user_id) + if this_version is None: + raise StoreError(404, "No current backup version") else: this_version = version + self._simple_delete_txn( + txn, + table="e2e_room_keys", + keyvalues={"user_id": user_id, "version": this_version}, + ) + return self._simple_update_one_txn( txn, table="e2e_room_keys_versions", diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index a470a48e0f..90bef0cd2c 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -364,9 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug( - "_get_backfill_events: %s, %s, %s", room_id, repr(event_list), limit - ) + logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) event_results = set() diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index e47ab604dd..bc04bfd7d4 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -720,7 +720,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # See bulk_get_push_rules_for_room for how we work around this. assert state_group is not None - cache = self._get_joined_hosts_cache(room_id) + cache = yield self._get_joined_hosts_cache(room_id) joined_hosts = yield cache.get_destinations(state_entry) return joined_hosts diff --git a/synapse/storage/data_stores/main/schema/delta/56/delete_keys_from_deleted_backups.sql b/synapse/storage/data_stores/main/schema/delta/56/delete_keys_from_deleted_backups.sql new file mode 100644 index 0000000000..1d2ddb1b1a --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/delete_keys_from_deleted_backups.sql @@ -0,0 +1,25 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* delete room keys that belong to deleted room key version, or to room key + * versions that don't exist (anymore) + */ +DELETE FROM e2e_room_keys +WHERE version NOT IN ( + SELECT version + FROM e2e_room_keys_versions + WHERE e2e_room_keys.user_id = e2e_room_keys_versions.user_id + AND e2e_room_keys_versions.deleted = 0 +); diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index d54442e5fa..9b2207075b 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -15,6 +15,7 @@ import logging from collections import namedtuple +from typing import Iterable, Tuple from six import iteritems, itervalues from six.moves import range @@ -23,6 +24,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.api.errors import NotFoundError +from synapse.events import EventBase +from synapse.events.snapshot import EventContext from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore @@ -1215,7 +1218,9 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore): def __init__(self, db_conn, hs): super(StateStore, self).__init__(db_conn, hs) - def _store_event_state_mappings_txn(self, txn, events_and_contexts): + def _store_event_state_mappings_txn( + self, txn, events_and_contexts: Iterable[Tuple[EventBase, EventContext]] + ): state_groups = {} for event, context in events_and_contexts: if event.internal_metadata.is_outlier(): diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py index 5ab639b2ad..4d59b7833f 100644 --- a/synapse/storage/data_stores/main/stats.py +++ b/synapse/storage/data_stores/main/stats.py @@ -332,7 +332,7 @@ class StatsStore(StateDeltasStore): def _bulk_update_stats_delta_txn(txn): for stats_type, stats_updates in updates.items(): for stats_id, fields in stats_updates.items(): - logger.info( + logger.debug( "Updating %s stats for %s: %s", stats_type, stats_id, fields ) self._update_stats_delta_txn( diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 5ac2530a6a..0e8da27f53 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -438,7 +438,7 @@ class CacheDescriptor(_CacheDescriptorBase): if isinstance(cached_result_d, ObservableDeferred): observer = cached_result_d.observe() else: - observer = cached_result_d + observer = defer.succeed(cached_result_d) except KeyError: ret = defer.maybeDeferred( @@ -482,9 +482,8 @@ class CacheListDescriptor(_CacheDescriptorBase): Given a list of keys it looks in the cache to find any hits, then passes the list of missing keys to the wrapped function. - Once wrapped, the function returns either a Deferred which resolves to - the list of results, or (if all results were cached), just the list of - results. + Once wrapped, the function returns a Deferred which resolves to the list + of results. """ def __init__( @@ -618,7 +617,7 @@ class CacheListDescriptor(_CacheDescriptorBase): ) return make_deferred_yieldable(d) else: - return results + return defer.succeed(results) obj.__dict__[self.orig.__name__] = wrapped |