diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/server.py | 6 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/_base.py | 10 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/filter.py | 6 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/sync.py | 4 | ||||
-rw-r--r-- | synapse/rest/media/v1/media_repository.py | 4 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 21 | ||||
-rw-r--r-- | synapse/storage/events.py | 33 | ||||
-rw-r--r-- | synapse/storage/schema/delta/37/remove_auth_idx.py | 4 | ||||
-rw-r--r-- | synapse/storage/schema/delta/41/event_search_event_id_idx.sql | 17 |
9 files changed, 92 insertions, 13 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py index 25e6666238..3910b9dc31 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -35,6 +35,8 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + self.filter_timeline_limit = config.get("filter_timeline_limit", -1) + if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': self.public_baseurl += '/' @@ -161,6 +163,10 @@ class ServerConfig(Config): # The GC threshold parameters to pass to `gc.set_threshold`, if defined # gc_thresholds: [700, 10, 10] + # Set the limit on the returned events in the timeline in the get + # and sync operations. The default value is -1, means no upper limit. + # filter_timeline_limit: 5000 + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 20e765f48f..1f5bc24cc3 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -47,3 +47,13 @@ def client_v2_patterns(path_regex, releases=(0,), new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release) patterns.append(re.compile("^" + new_prefix + path_regex)) return patterns + + +def set_timeline_upper_limit(filter_json, filter_timeline_limit): + if filter_timeline_limit < 0: + return # no upper limits + timeline = filter_json.get('room', {}).get('timeline', {}) + if 'limit' in timeline: + filter_json['room']['timeline']["limit"] = min( + filter_json['room']['timeline']['limit'], + filter_timeline_limit) diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index b4084fec62..d2b2fd66e6 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -20,6 +20,7 @@ from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID from ._base import client_v2_patterns +from ._base import set_timeline_upper_limit import logging @@ -85,6 +86,11 @@ class CreateFilterRestServlet(RestServlet): raise AuthError(403, "Can only create filters for local users") content = parse_json_object_from_request(request) + set_timeline_upper_limit( + content, + self.hs.config.filter_timeline_limit + ) + filter_id = yield self.filtering.add_user_filter( user_localpart=target_user.localpart, user_filter=content, diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f30eab76fd..771e127ab9 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -28,6 +28,7 @@ from synapse.api.filtering import FilterCollection, DEFAULT_FILTER_COLLECTION from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from ._base import client_v2_patterns +from ._base import set_timeline_upper_limit import itertools import logging @@ -78,6 +79,7 @@ class SyncRestServlet(RestServlet): def __init__(self, hs): super(SyncRestServlet, self).__init__() + self.hs = hs self.auth = hs.get_auth() self.sync_handler = hs.get_sync_handler() self.clock = hs.get_clock() @@ -121,6 +123,8 @@ class SyncRestServlet(RestServlet): if filter_id.startswith('{'): try: filter_object = json.loads(filter_id) + set_timeline_upper_limit(filter_object, + self.hs.config.filter_timeline_limit) except: raise SynapseError(400, "Invalid filter JSON") self.filtering.check_valid_filter(filter_object) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index c43b185e08..caca96c222 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \ from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.retryutils import NotRetryingDestination import os import errno @@ -181,7 +182,8 @@ class MediaRepository(object): logger.exception("Failed to fetch remote media %s/%s", server_name, media_id) raise - + except NotRetryingDestination: + logger.warn("Not retrying destination %r", server_name) except Exception: logger.exception("Failed to fetch remote media %s/%s", server_name, media_id) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index d4cf0fc59b..7157fb1dfb 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -210,7 +210,9 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers[update_name] = update_handler def register_background_index_update(self, update_name, index_name, - table, columns, where_clause=None): + table, columns, where_clause=None, + unique=False, + psql_only=False): """Helper for store classes to do a background index addition To use: @@ -226,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore): index_name (str): name of index to add table (str): table to add index to columns (list[str]): columns/expressions to include in index + unique (bool): true to make a UNIQUE index + psql_only: true to only create this index on psql databases (useful + for virtual sqlite tables) """ def create_index_psql(conn): @@ -245,9 +250,11 @@ class BackgroundUpdateStore(SQLBaseStore): c.execute(sql) sql = ( - "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s" + "CREATE %(unique)s INDEX CONCURRENTLY %(name)s" + " ON %(table)s" " (%(columns)s) %(where_clause)s" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), @@ -270,9 +277,10 @@ class BackgroundUpdateStore(SQLBaseStore): # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.) sql = ( - "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s" + "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s" " (%(columns)s)" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), @@ -284,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore): if isinstance(self.database_engine, engines.PostgresEngine): runner = create_index_psql + elif psql_only: + runner = None else: runner = create_index_sqlite @defer.inlineCallbacks def updater(progress, batch_size): - logger.info("Adding index %s to %s", index_name, table) - yield self.runWithConnection(runner) + if runner is not None: + logger.info("Adding index %s to %s", index_name, table) + yield self.runWithConnection(runner) yield self._end_background_update(update_name) defer.returnValue(1) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dbd63078c6..ea6879c619 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore): where_clause="contains_url = true AND outlier = false", ) + # an event_id index on event_search is useful for the purge_history + # api. Plus it means we get to enforce some integrity with a UNIQUE + # clause + self.register_background_index_update( + "event_search_event_id_idx", + index_name="event_search_event_id_idx", + table="event_search", + columns=["event_id"], + unique=True, + psql_only=True, + ) + self._event_persist_queue = _EventPeristenceQueue() def persist_events(self, events_and_contexts, backfilled=False): @@ -2022,6 +2034,8 @@ class EventsStore(SQLBaseStore): 400, "topological_ordering is greater than forward extremeties" ) + logger.debug("[purge] looking for events to delete") + txn.execute( "SELECT event_id, state_key FROM events" " LEFT JOIN state_events USING (room_id, event_id)" @@ -2030,6 +2044,14 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + to_delete = [ + (event_id,) for event_id, state_key in event_rows + if state_key is None and not self.hs.is_mine_id(event_id) + ] + logger.info( + "[purge] found %i events before cutoff, of which %i are remote" + " non-state events to delete", len(event_rows), len(to_delete)) + for event_id, state_key in event_rows: txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) @@ -2080,6 +2102,7 @@ class EventsStore(SQLBaseStore): ) state_rows = txn.fetchall() + logger.debug("[purge] found %i redundant state groups", len(state_rows)) # make a set of the redundant state groups, so that we can look them up # efficiently @@ -2173,10 +2196,6 @@ class EventsStore(SQLBaseStore): ) # Delete all remote non-state events - to_delete = [ - (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) - ] for table in ( "events", "event_json", @@ -2192,7 +2211,7 @@ class EventsStore(SQLBaseStore): "event_signatures", "rejections", ): - logger.debug("[purge] removing non-state events from %s", table) + logger.debug("[purge] removing remote non-state events from %s", table) txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -2200,7 +2219,7 @@ class EventsStore(SQLBaseStore): ) # Mark all state and own events as outliers - logger.debug("[purge] marking events as outliers") + logger.debug("[purge] marking remaining events as outliers") txn.executemany( "UPDATE events SET outlier = ?" " WHERE event_id = ?", @@ -2210,7 +2229,7 @@ class EventsStore(SQLBaseStore): ] ) - logger.debug("[purge] done") + logger.info("[purge] done") @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py index 784f3b348f..20ad8bd5a6 100644 --- a/synapse/storage/schema/delta/37/remove_auth_idx.py +++ b/synapse/storage/schema/delta/37/remove_auth_idx.py @@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref; -- and is used incredibly rarely. DROP INDEX IF EXISTS events_order_topo_stream_room; +-- an equivalent index to this actually gets re-created in delta 41, because it +-- turned out that deleting it wasn't a great plan :/. In any case, let's +-- delete it here, and delta 41 will create a new one with an added UNIQUE +-- constraint DROP INDEX IF EXISTS event_search_ev_idx; """ diff --git a/synapse/storage/schema/delta/41/event_search_event_id_idx.sql b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql new file mode 100644 index 0000000000..5d9cfecf36 --- /dev/null +++ b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_event_id_idx', '{}'); |