summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/rest/client/v2_alpha/_base.py10
-rw-r--r--synapse/rest/client/v2_alpha/filter.py6
-rw-r--r--synapse/rest/client/v2_alpha/sync.py4
-rw-r--r--synapse/rest/media/v1/media_repository.py4
-rw-r--r--synapse/storage/background_updates.py21
-rw-r--r--synapse/storage/events.py33
-rw-r--r--synapse/storage/schema/delta/37/remove_auth_idx.py4
-rw-r--r--synapse/storage/schema/delta/41/event_search_event_id_idx.sql17
9 files changed, 92 insertions, 13 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 25e6666238..3910b9dc31 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -35,6 +35,8 @@ class ServerConfig(Config):
         # "disable" federation
         self.send_federation = config.get("send_federation", True)
 
+        self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
@@ -161,6 +163,10 @@ class ServerConfig(Config):
         # The GC threshold parameters to pass to `gc.set_threshold`, if defined
         # gc_thresholds: [700, 10, 10]
 
+        # Set the limit on the returned events in the timeline in the get
+        # and sync operations. The default value is -1, means no upper limit.
+        # filter_timeline_limit: 5000
+
         # List of ports that Synapse should listen on, their purpose and their
         # configuration.
         listeners:
diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py
index 20e765f48f..1f5bc24cc3 100644
--- a/synapse/rest/client/v2_alpha/_base.py
+++ b/synapse/rest/client/v2_alpha/_base.py
@@ -47,3 +47,13 @@ def client_v2_patterns(path_regex, releases=(0,),
         new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release)
         patterns.append(re.compile("^" + new_prefix + path_regex))
     return patterns
+
+
+def set_timeline_upper_limit(filter_json, filter_timeline_limit):
+    if filter_timeline_limit < 0:
+        return  # no upper limits
+    timeline = filter_json.get('room', {}).get('timeline', {})
+    if 'limit' in timeline:
+        filter_json['room']['timeline']["limit"] = min(
+            filter_json['room']['timeline']['limit'],
+            filter_timeline_limit)
diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py
index b4084fec62..d2b2fd66e6 100644
--- a/synapse/rest/client/v2_alpha/filter.py
+++ b/synapse/rest/client/v2_alpha/filter.py
@@ -20,6 +20,7 @@ from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.types import UserID
 
 from ._base import client_v2_patterns
+from ._base import set_timeline_upper_limit
 
 import logging
 
@@ -85,6 +86,11 @@ class CreateFilterRestServlet(RestServlet):
             raise AuthError(403, "Can only create filters for local users")
 
         content = parse_json_object_from_request(request)
+        set_timeline_upper_limit(
+            content,
+            self.hs.config.filter_timeline_limit
+        )
+
         filter_id = yield self.filtering.add_user_filter(
             user_localpart=target_user.localpart,
             user_filter=content,
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index f30eab76fd..771e127ab9 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -28,6 +28,7 @@ from synapse.api.filtering import FilterCollection, DEFAULT_FILTER_COLLECTION
 from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from ._base import client_v2_patterns
+from ._base import set_timeline_upper_limit
 
 import itertools
 import logging
@@ -78,6 +79,7 @@ class SyncRestServlet(RestServlet):
 
     def __init__(self, hs):
         super(SyncRestServlet, self).__init__()
+        self.hs = hs
         self.auth = hs.get_auth()
         self.sync_handler = hs.get_sync_handler()
         self.clock = hs.get_clock()
@@ -121,6 +123,8 @@ class SyncRestServlet(RestServlet):
             if filter_id.startswith('{'):
                 try:
                     filter_object = json.loads(filter_id)
+                    set_timeline_upper_limit(filter_object,
+                                             self.hs.config.filter_timeline_limit)
                 except:
                     raise SynapseError(400, "Invalid filter JSON")
                 self.filtering.check_valid_filter(filter_object)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index c43b185e08..caca96c222 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \
 from synapse.util.async import Linearizer
 from synapse.util.stringutils import is_ascii
 from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.retryutils import NotRetryingDestination
 
 import os
 import errno
@@ -181,7 +182,8 @@ class MediaRepository(object):
                     logger.exception("Failed to fetch remote media %s/%s",
                                      server_name, media_id)
                     raise
-
+                except NotRetryingDestination:
+                    logger.warn("Not retrying destination %r", server_name)
                 except Exception:
                     logger.exception("Failed to fetch remote media %s/%s",
                                      server_name, media_id)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index d4cf0fc59b..7157fb1dfb 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -210,7 +210,9 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_handlers[update_name] = update_handler
 
     def register_background_index_update(self, update_name, index_name,
-                                         table, columns, where_clause=None):
+                                         table, columns, where_clause=None,
+                                         unique=False,
+                                         psql_only=False):
         """Helper for store classes to do a background index addition
 
         To use:
@@ -226,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore):
             index_name (str): name of index to add
             table (str): table to add index to
             columns (list[str]): columns/expressions to include in index
+            unique (bool): true to make a UNIQUE index
+            psql_only: true to only create this index on psql databases (useful
+                for virtual sqlite tables)
         """
 
         def create_index_psql(conn):
@@ -245,9 +250,11 @@ class BackgroundUpdateStore(SQLBaseStore):
                 c.execute(sql)
 
                 sql = (
-                    "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s"
+                    "CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
+                    " ON %(table)s"
                     " (%(columns)s) %(where_clause)s"
                 ) % {
+                    "unique": "UNIQUE" if unique else "",
                     "name": index_name,
                     "table": table,
                     "columns": ", ".join(columns),
@@ -270,9 +277,10 @@ class BackgroundUpdateStore(SQLBaseStore):
             # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite
             # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
             sql = (
-                "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s"
+                "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
                 " (%(columns)s)"
             ) % {
+                "unique": "UNIQUE" if unique else "",
                 "name": index_name,
                 "table": table,
                 "columns": ", ".join(columns),
@@ -284,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore):
 
         if isinstance(self.database_engine, engines.PostgresEngine):
             runner = create_index_psql
+        elif psql_only:
+            runner = None
         else:
             runner = create_index_sqlite
 
         @defer.inlineCallbacks
         def updater(progress, batch_size):
-            logger.info("Adding index %s to %s", index_name, table)
-            yield self.runWithConnection(runner)
+            if runner is not None:
+                logger.info("Adding index %s to %s", index_name, table)
+                yield self.runWithConnection(runner)
             yield self._end_background_update(update_name)
             defer.returnValue(1)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index dbd63078c6..ea6879c619 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore):
             where_clause="contains_url = true AND outlier = false",
         )
 
+        # an event_id index on event_search is useful for the purge_history
+        # api. Plus it means we get to enforce some integrity with a UNIQUE
+        # clause
+        self.register_background_index_update(
+            "event_search_event_id_idx",
+            index_name="event_search_event_id_idx",
+            table="event_search",
+            columns=["event_id"],
+            unique=True,
+            psql_only=True,
+        )
+
         self._event_persist_queue = _EventPeristenceQueue()
 
     def persist_events(self, events_and_contexts, backfilled=False):
@@ -2022,6 +2034,8 @@ class EventsStore(SQLBaseStore):
                 400, "topological_ordering is greater than forward extremeties"
             )
 
+        logger.debug("[purge] looking for events to delete")
+
         txn.execute(
             "SELECT event_id, state_key FROM events"
             " LEFT JOIN state_events USING (room_id, event_id)"
@@ -2030,6 +2044,14 @@ class EventsStore(SQLBaseStore):
         )
         event_rows = txn.fetchall()
 
+        to_delete = [
+            (event_id,) for event_id, state_key in event_rows
+            if state_key is None and not self.hs.is_mine_id(event_id)
+        ]
+        logger.info(
+            "[purge] found %i events before cutoff, of which %i are remote"
+            " non-state events to delete", len(event_rows), len(to_delete))
+
         for event_id, state_key in event_rows:
             txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
 
@@ -2080,6 +2102,7 @@ class EventsStore(SQLBaseStore):
         )
 
         state_rows = txn.fetchall()
+        logger.debug("[purge] found %i redundant state groups", len(state_rows))
 
         # make a set of the redundant state groups, so that we can look them up
         # efficiently
@@ -2173,10 +2196,6 @@ class EventsStore(SQLBaseStore):
         )
 
         # Delete all remote non-state events
-        to_delete = [
-            (event_id,) for event_id, state_key in event_rows
-            if state_key is None and not self.hs.is_mine_id(event_id)
-        ]
         for table in (
             "events",
             "event_json",
@@ -2192,7 +2211,7 @@ class EventsStore(SQLBaseStore):
             "event_signatures",
             "rejections",
         ):
-            logger.debug("[purge] removing non-state events from %s", table)
+            logger.debug("[purge] removing remote non-state events from %s", table)
 
             txn.executemany(
                 "DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -2200,7 +2219,7 @@ class EventsStore(SQLBaseStore):
             )
 
         # Mark all state and own events as outliers
-        logger.debug("[purge] marking events as outliers")
+        logger.debug("[purge] marking remaining events as outliers")
         txn.executemany(
             "UPDATE events SET outlier = ?"
             " WHERE event_id = ?",
@@ -2210,7 +2229,7 @@ class EventsStore(SQLBaseStore):
             ]
         )
 
-        logger.debug("[purge] done")
+        logger.info("[purge] done")
 
     @defer.inlineCallbacks
     def is_event_after(self, event_id1, event_id2):
diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py
index 784f3b348f..20ad8bd5a6 100644
--- a/synapse/storage/schema/delta/37/remove_auth_idx.py
+++ b/synapse/storage/schema/delta/37/remove_auth_idx.py
@@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref;
 -- and is used incredibly rarely.
 DROP INDEX IF EXISTS events_order_topo_stream_room;
 
+-- an equivalent index to this actually gets re-created in delta 41, because it
+-- turned out that deleting it wasn't a great plan :/. In any case, let's
+-- delete it here, and delta 41 will create a new one with an added UNIQUE
+-- constraint
 DROP INDEX IF EXISTS event_search_ev_idx;
 """
 
diff --git a/synapse/storage/schema/delta/41/event_search_event_id_idx.sql b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql
new file mode 100644
index 0000000000..5d9cfecf36
--- /dev/null
+++ b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('event_search_event_id_idx', '{}');