summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/handlers/device.py46
-rw-r--r--synapse/handlers/presence.py44
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/handlers/sync.py3
-rw-r--r--synapse/notifier.py1
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/rest/client/v2_alpha/keys.py2
-rw-r--r--synapse/storage/roommember.py21
-rw-r--r--synapse/util/caches/descriptors.py5
10 files changed, 74 insertions, 54 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 498ded38c0..d3f445be9c 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.18.7"
+__version__ = "0.19.0"
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 815410969c..8cb47ac417 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,7 +17,8 @@ from synapse.api import errors
 from synapse.api.constants import EventTypes
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
-from synapse.types import get_domain_from_id
+from synapse.util.metrics import measure_func
+from synapse.types import get_domain_from_id, RoomStreamToken
 from twisted.internet import defer
 from ._base import BaseHandler
 
@@ -193,25 +194,28 @@ class DeviceHandler(BaseHandler):
             else:
                 raise
 
+    @measure_func("notify_device_update")
     @defer.inlineCallbacks
     def notify_device_update(self, user_id, device_ids):
         """Notify that a user's device(s) has changed. Pokes the notifier, and
         remote servers if the user is local.
         """
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = [r.room_id for r in rooms]
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
 
         hosts = set()
         if self.hs.is_mine_id(user_id):
-            for room_id in room_ids:
-                users = yield self.store.get_users_in_room(room_id)
-                hosts.update(get_domain_from_id(u) for u in users)
+            hosts.update(get_domain_from_id(u) for u in users_who_share_room)
             hosts.discard(self.server_name)
 
         position = yield self.store.add_device_change_to_streams(
             user_id, device_ids, list(hosts)
         )
 
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        room_ids = [r.room_id for r in rooms]
+
         yield self.notifier.on_new_event(
             "device_list_key", position, rooms=room_ids,
         )
@@ -221,6 +225,7 @@ class DeviceHandler(BaseHandler):
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
 
+    @measure_func("device.get_user_ids_changed")
     @defer.inlineCallbacks
     def get_user_ids_changed(self, user_id, from_token):
         """Get list of users that have had the devices updated, or have newly
@@ -243,15 +248,15 @@ class DeviceHandler(BaseHandler):
 
         possibly_changed = set(changed)
         for room_id in rooms_changed:
-            # Fetch (an approximation) of the current state at the time.
-            event_rows, token = yield self.store.get_recent_event_ids_for_room(
-                room_id, end_token=from_token.room_key, limit=1,
-            )
+            # Fetch  the current state at the time.
+            stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
 
-            if event_rows:
-                last_event_id = event_rows[-1]["event_id"]
-                prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id)
-            else:
+            try:
+                event_ids = yield self.store.get_forward_extremeties_for_room(
+                    room_id, stream_ordering=stream_ordering
+                )
+                prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+            except:
                 prev_state_ids = {}
 
             current_state_ids = yield self.state.get_current_state_ids(room_id)
@@ -266,14 +271,15 @@ class DeviceHandler(BaseHandler):
                     if not prev_event_id or prev_event_id != event_id:
                         possibly_changed.add(state_key)
 
-        user_ids_changed = set()
-        for other_user_id in possibly_changed:
-            other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-            if room_ids.intersection(e.room_id for e in other_rooms):
-                user_ids_changed.add(other_user_id)
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
 
-        defer.returnValue(user_ids_changed)
+        # Take the intersection of the users whose devices may have changed
+        # and those that actually still share a room with the user
+        defer.returnValue(users_who_share_room & possibly_changed)
 
+    @measure_func("_incoming_device_list_update")
     @defer.inlineCallbacks
     def _incoming_device_list_update(self, origin, edu_content):
         user_id = edu_content["user_id"]
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9982ae0fed..fdfce2a88c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1011,7 +1011,7 @@ class PresenceEventSource(object):
     @defer.inlineCallbacks
     @log_function
     def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
-                       **kwargs):
+                       explicit_room_id=None, **kwargs):
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1028,22 +1028,24 @@ class PresenceEventSource(object):
             user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
-            room_ids = room_ids or []
 
             presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
 
-            if not room_ids:
-                rooms = yield self.store.get_rooms_for_user(user_id)
-                room_ids = set(e.room_id for e in rooms)
-            else:
-                room_ids = set(room_ids)
-
             max_token = self.store.get_current_presence_token()
 
             plist = yield self.store.get_presence_list_accepted(user.localpart)
-            friends = set(row["observed_user_id"] for row in plist)
-            friends.add(user_id)  # So that we receive our own presence
+            users_interested_in = set(row["observed_user_id"] for row in plist)
+            users_interested_in.add(user_id)  # So that we receive our own presence
+
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
+            users_interested_in.update(users_who_share_room)
+
+            if explicit_room_id:
+                user_ids = yield self.store.get_users_in_room(explicit_room_id)
+                users_interested_in.update(user_ids)
 
             user_ids_changed = set()
             changed = None
@@ -1055,35 +1057,19 @@ class PresenceEventSource(object):
                 # work out if we share a room or they're in our presence list
                 get_updates_counter.inc("stream")
                 for other_user_id in changed:
-                    if other_user_id in friends:
+                    if other_user_id in users_interested_in:
                         user_ids_changed.add(other_user_id)
-                        continue
-                    other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-                    if room_ids.intersection(e.room_id for e in other_rooms):
-                        user_ids_changed.add(other_user_id)
-                        continue
             else:
                 # Too many possible updates. Find all users we can see and check
                 # if any of them have changed.
                 get_updates_counter.inc("full")
 
-                user_ids_to_check = set()
-                for room_id in room_ids:
-                    users = yield self.store.get_users_in_room(room_id)
-                    user_ids_to_check.update(users)
-
-                user_ids_to_check.update(friends)
-
-                # Always include yourself. Only really matters for when the user is
-                # not in any rooms, but still.
-                user_ids_to_check.add(user_id)
-
                 if from_key:
                     user_ids_changed = stream_change_cache.get_entities_changed(
-                        user_ids_to_check, from_key,
+                        users_interested_in, from_key,
                     )
                 else:
-                    user_ids_changed = user_ids_to_check
+                    user_ids_changed = users_interested_in
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5f18007e90..7e7671c9a2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -437,6 +437,7 @@ class RoomEventSource(object):
             limit,
             room_ids,
             is_guest,
+            explicit_room_id=None,
     ):
         # We just ignore the key for now.
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9a44de3f33..d7dcd1ce5b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -16,7 +16,7 @@
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
 from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
@@ -561,6 +561,7 @@ class SyncHandler(object):
             next_batch=sync_result_builder.now_token,
         ))
 
+    @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
     def _generate_sync_entry_for_device_list(self, sync_result_builder):
         user_id = sync_result_builder.sync_config.user.to_string()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index acbd4bb5ae..8051a7a842 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -378,6 +378,7 @@ class Notifier(object):
                     limit=limit,
                     is_guest=is_peeking,
                     room_ids=room_ids,
+                    explicit_room_id=explicit_room_id,
                 )
 
                 if name == "room":
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 15a025a019..d72ff6055c 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -73,6 +73,9 @@ class SlavedEventStore(BaseSlavedStore):
     # to reach inside the __dict__ to extract them.
     get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
     get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
+    get_users_who_share_room_with_user = (
+        RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
+    )
     get_latest_event_ids_in_room = EventFederationStore.__dict__[
         "get_latest_event_ids_in_room"
     ]
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index f99b53530a..6a3cfe84f8 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -193,7 +193,7 @@ class KeyChangesServlet(RestServlet):
         )
 
         defer.returnValue((200, {
-            "changed": changed
+            "changed": list(changed),
         }))
 
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ee800d074f..545d3d3a99 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -129,7 +129,7 @@ class RoomMemberStore(SQLBaseStore):
         with self._stream_id_gen.get_next() as stream_ordering:
             yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
-    @cached(max_entries=100000, iterable=True)
+    @cached(max_entries=500000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
 
@@ -274,12 +274,29 @@ class RoomMemberStore(SQLBaseStore):
 
         return rows
 
-    @cached(max_entries=5000)
+    @cached(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
         return self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
 
+    @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
+    def get_users_who_share_room_with_user(self, user_id, cache_context):
+        """Returns the set of users who share a room with `user_id`
+        """
+        rooms = yield self.get_rooms_for_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+
+        user_who_share_room = set()
+        for room in rooms:
+            user_ids = yield self.get_users_in_room(
+                room.room_id, on_invalidate=cache_context.invalidate,
+            )
+            user_who_share_room.update(user_ids)
+
+        defer.returnValue(user_who_share_room)
+
     def forget(self, user_id, room_id):
         """Indicate that user_id wishes to discard history for room_id."""
         def f(txn):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 675bfd5feb..998de70d29 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -478,6 +478,11 @@ class CacheListDescriptor(object):
 
 
 class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
+    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
+    # which namedtuple does for us (i.e. two _CacheContext are the same if
+    # their caches and keys match). This is important in particular to
+    # dedupe when we add callbacks to lru cache nodes, otherwise the number
+    # of callbacks would grow.
     def invalidate(self):
         self.cache.invalidate(self.key)