summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/filtering.py9
-rwxr-xr-xsynapse/app/homeserver.py4
-rw-r--r--synapse/groups/attestations.py2
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/sync.py87
-rw-r--r--synapse/metrics/background_process_metrics.py10
-rw-r--r--synapse/rest/media/v1/media_repository.py2
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py2
-rw-r--r--synapse/storage/client_ips.py2
-rw-r--r--synapse/storage/devices.py2
-rw-r--r--synapse/storage/event_federation.py15
-rw-r--r--synapse/storage/event_push_actions.py4
-rw-r--r--synapse/storage/events.py1
-rw-r--r--synapse/storage/transactions.py4
-rw-r--r--synapse/util/caches/expiringcache.py2
15 files changed, 100 insertions, 48 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 7e767b9bf5..186831e118 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -117,6 +117,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
         "lazy_load_members": {
             "type": "boolean"
         },
+        "include_redundant_members": {
+            "type": "boolean"
+        },
     }
 }
 
@@ -267,6 +270,9 @@ class FilterCollection(object):
     def lazy_load_members(self):
         return self._room_state_filter.lazy_load_members()
 
+    def include_redundant_members(self):
+        return self._room_state_filter.include_redundant_members()
+
     def filter_presence(self, events):
         return self._presence_filter.filter(events)
 
@@ -426,6 +432,9 @@ class Filter(object):
     def lazy_load_members(self):
         return self.filter_json.get("lazy_load_members", False)
 
+    def include_redundant_members(self):
+        return self.filter_json.get("include_redundant_members", False)
+
 
 def _matches_wildcard(actual_value, filter_value):
     if filter_value.endswith("*"):
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b7e7718290..57b815d777 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -429,7 +429,7 @@ def run(hs):
     stats_process = []
 
     def start_phone_stats_home():
-        run_as_background_process("phone_stats_home", phone_stats_home)
+        return run_as_background_process("phone_stats_home", phone_stats_home)
 
     @defer.inlineCallbacks
     def phone_stats_home():
@@ -502,7 +502,7 @@ def run(hs):
             )
 
     def generate_user_daily_visit_stats():
-        run_as_background_process(
+        return run_as_background_process(
             "generate_user_daily_visits",
             hs.get_datastore().generate_user_daily_visits,
         )
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 4216af0a27..b04f4234ca 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -153,7 +153,7 @@ class GroupAttestionRenewer(object):
         defer.returnValue({})
 
     def _start_renew_attestations(self):
-        run_as_background_process("renew_attestations", self._renew_attestations)
+        return run_as_background_process("renew_attestations", self._renew_attestations)
 
     @defer.inlineCallbacks
     def _renew_attestations(self):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 43692b83a8..cb5c6d587e 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -256,7 +256,7 @@ class ProfileHandler(BaseHandler):
                 )
 
     def _start_update_remote_profile_cache(self):
-        run_as_background_process(
+        return run_as_background_process(
             "Update remote profile", self._update_remote_profile_cache,
         )
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4ced3144c8..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -26,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.types import RoomStreamToken
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
@@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
@@ -182,6 +192,12 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
 
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
+
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -505,9 +521,13 @@ class SyncHandler(object):
         with Measure(self.clock, "compute_state_delta"):
 
             types = None
-            lazy_load_members = sync_config.filter_collection.lazy_load_members()
             filtered_types = None
 
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
             if lazy_load_members:
                 # We only request state for the members needed to display the
                 # timeline:
@@ -523,6 +543,11 @@ class SyncHandler(object):
                 # only apply the filtering to room members
                 filtered_types = [EventTypes.Member]
 
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
@@ -543,11 +568,6 @@ class SyncHandler(object):
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
@@ -571,21 +591,6 @@ class SyncHandler(object):
                     filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
-                # TODO: optionally filter out redundant membership events at this
-                # point, to stop repeatedly sending members in every /sync as if
-                # the client isn't tracking them.
-                # When implemented, this should filter using event_ids (not mxids).
-                # In practice, limited syncs are
-                # relatively rare so it's not a total disaster to send redundant
-                # members down at this point. Redundant members are ones which
-                # repeatedly get sent down /sync because we don't know if the client
-                # is caching them or not.
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
@@ -596,16 +601,48 @@ class SyncHandler(object):
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    # TODO: filter out redundant members based on their mxids (not their
-                    # event_ids) at this point. We know we can do it based on mxid as this
-                    # is an non-gappy incremental sync.
-
                     if types:
                         state_ids = yield self.store.get_state_ids_for_event(
                             batch.events[0].event_id, types=types,
                             filtered_types=filtered_types,
                         )
 
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.lazy_loaded_members_cache.get(cache_key)
+                if cache is None:
+                    logger.debug("creating LruCache for %r", cache_key)
+                    cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+                    self.lazy_loaded_members_cache[cache_key] = cache
+                else:
+                    logger.debug("found LruCache for %r", cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
+
         state = {}
         if state_ids:
             state = yield self.store.get_events(list(state_ids.values()))
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 9d820e44a6..ce678d5f75 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -151,13 +151,19 @@ def run_as_background_process(desc, func, *args, **kwargs):
     This should be used to wrap processes which are fired off to run in the
     background, instead of being associated with a particular request.
 
+    It returns a Deferred which completes when the function completes, but it doesn't
+    follow the synapse logcontext rules, which makes it appropriate for passing to
+    clock.looping_call and friends (or for firing-and-forgetting in the middle of a
+    normal synapse inlineCallbacks function).
+
     Args:
         desc (str): a description for this background process type
         func: a function, which may return a Deferred
         args: positional args for func
         kwargs: keyword args for func
 
-    Returns: None
+    Returns: Deferred which returns the result of func, but note that it does not
+        follow the synapse logcontext rules.
     """
     @defer.inlineCallbacks
     def run():
@@ -176,4 +182,4 @@ def run_as_background_process(desc, func, *args, **kwargs):
                 _background_processes[desc].remove(proc)
 
     with PreserveLoggingContext():
-        run()
+        return run()
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 5b13378caa..174ad20123 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -106,7 +106,7 @@ class MediaRepository(object):
         )
 
     def _start_update_recently_accessed(self):
-        run_as_background_process(
+        return run_as_background_process(
             "update_recently_accessed_media", self._update_recently_accessed,
         )
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 4efd5339a4..27aa0def2f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -373,7 +373,7 @@ class PreviewUrlResource(Resource):
         })
 
     def _start_expire_url_cache_data(self):
-        run_as_background_process(
+        return run_as_background_process(
             "expire_url_cache_data", self._expire_url_cache_data,
         )
 
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 77ae10da3d..b8cefd43d6 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -102,7 +102,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 to_update,
             )
 
-        run_as_background_process(
+        return run_as_background_process(
             "update_client_ips", update,
         )
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 52dccb1507..c0943ecf91 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -712,7 +712,7 @@ class DeviceStore(SQLBaseStore):
 
             logger.info("Pruned %d device list outbound pokes", txn.rowcount)
 
-        run_as_background_process(
+        return run_as_background_process(
             "prune_old_outbound_device_pokes",
             self.runInteraction,
             "_prune_old_outbound_device_pokes",
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 65f2d19e20..5d3ee90017 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -114,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         sql = (
             "SELECT b.event_id, MAX(e.depth) FROM events as e"
             " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+            " ON g.event_id = e.event_id"
             " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+            " ON g.prev_event_id = b.event_id"
             " WHERE b.room_id = ? AND g.is_state is ?"
             " GROUP BY b.event_id"
         )
@@ -330,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             "SELECT depth, prev_event_id FROM event_edges"
             " INNER JOIN events"
             " ON prev_event_id = events.event_id"
-            " AND event_edges.room_id = events.room_id"
-            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " WHERE event_edges.event_id = ?"
             " AND event_edges.is_state = ?"
             " LIMIT ?"
         )
@@ -365,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
             txn.execute(
                 query,
-                (room_id, event_id, False, limit - len(event_results))
+                (event_id, False, limit - len(event_results))
             )
 
             for row in txn:
@@ -402,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
         query = (
             "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? AND is_state = ? "
+            "WHERE event_id = ? AND is_state = ? "
             "LIMIT ?"
         )
 
@@ -411,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             for event_id in front:
                 txn.execute(
                     query,
-                    (room_id, event_id, False, limit - len(event_results))
+                    (event_id, False, limit - len(event_results))
                 )
 
                 for e_id, in txn:
@@ -549,7 +548,7 @@ class EventFederationStore(EventFederationWorkerStore):
                 sql,
                 (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
             )
-        run_as_background_process(
+        return run_as_background_process(
             "delete_old_forward_extrem_cache",
             self.runInteraction,
             "_delete_old_forward_extrem_cache",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4f44b0ad47..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -460,7 +460,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             )
 
     def _find_stream_orderings_for_times(self):
-        run_as_background_process(
+        return run_as_background_process(
             "event_push_action_stream_orderings",
             self.runInteraction,
             "_find_stream_orderings_for_times",
@@ -790,7 +790,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         """, (room_id, user_id, stream_ordering))
 
     def _start_rotate_notifs(self):
-        run_as_background_process("rotate_notifs", self._rotate_notifs)
+        return run_as_background_process("rotate_notifs", self._rotate_notifs)
 
     @defer.inlineCallbacks
     def _rotate_notifs(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 94515cd153..2f482af3a1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -520,7 +520,6 @@ class EventsStore(EventsWorkerStore):
             iterable=list(new_latest_event_ids),
             retcols=["prev_event_id"],
             keyvalues={
-                "room_id": room_id,
                 "is_state": False,
             },
             desc="_calculate_new_extremeties",
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b4b479d94c..428e7fa36e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -273,7 +273,9 @@ class TransactionStore(SQLBaseStore):
         return self.cursor_to_dict(txn)
 
     def _start_cleanup_transactions(self):
-        run_as_background_process("cleanup_transactions", self._cleanup_transactions)
+        return run_as_background_process(
+            "cleanup_transactions", self._cleanup_transactions,
+        )
 
     def _cleanup_transactions(self):
         now = self._clock.time_msec()
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 465adc54a8..ce85b2ae11 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -64,7 +64,7 @@ class ExpiringCache(object):
             return
 
         def f():
-            run_as_background_process(
+            return run_as_background_process(
                 "prune_cache_%s" % self._cache_name,
                 self._prune_cache,
             )