summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-03-15 16:01:29 +0000
committerGitHub <noreply@github.com>2017-03-15 16:01:29 +0000
commit9d527191bcd14f51ae43cfccf18a34ecb19df801 (patch)
tree272a697640aaf3c919db0b63967908e89b8f16f5 /synapse/handlers/presence.py
parentMerge pull request #1997 from matrix-org/dbkr/cas_partialdownload (diff)
parentComment (diff)
downloadsynapse-9d527191bcd14f51ae43cfccf18a34ecb19df801.tar.xz
Merge pull request #2013 from matrix-org/erikj/presence_FASTER
Format presence events on the edges instead of reformatting them multiple times
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py73
1 files changed, 45 insertions, 28 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da610e430f..9cc94287b3 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from synapse.storage.presence import UserPresenceState
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -719,9 +720,7 @@ class PresenceHandler(object):
                 for state in updates
             ])
         else:
-            defer.returnValue([
-                format_user_presence_state(state, now) for state in updates
-            ])
+            defer.returnValue(updates)
 
     @defer.inlineCallbacks
     def set_state(self, target_user, state, ignore_status_msg=False):
@@ -795,6 +794,9 @@ class PresenceHandler(object):
             as_event=False,
         )
 
+        now = self.clock.time_msec()
+        results[:] = [format_user_presence_state(r, now) for r in results]
+
         is_accepted = {
             row["observed_user_id"]: row["accepted"] for row in presence_list
         }
@@ -847,6 +849,7 @@ class PresenceHandler(object):
             )
 
             state_dict = yield self.get_state(observed_user, as_event=False)
+            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
 
             self.federation.send_edu(
                 destination=observer_user.domain,
@@ -979,14 +982,18 @@ def should_notify(old_state, new_state):
     return False
 
 
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
     """Convert UserPresenceState to a format that can be sent down to clients
     and to other servers.
+
+    The "user_id" is optional so that this function can be used to format presence
+    updates for client /sync responses and for federation /send requests.
     """
     content = {
         "presence": state.state,
-        "user_id": state.user_id,
     }
+    if include_user_id:
+        content["user_id"] = state.user_id
     if state.last_active_ts:
         content["last_active_ago"] = now - state.last_active_ts
     if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1025,7 +1032,6 @@ class PresenceEventSource(object):
         # sending down the rare duplicate is not a concern.
 
         with Measure(self.clock, "presence.get_new_events"):
-            user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1034,18 +1040,7 @@ class PresenceEventSource(object):
 
             max_token = self.store.get_current_presence_token()
 
-            plist = yield self.store.get_presence_list_accepted(user.localpart)
-            users_interested_in = set(row["observed_user_id"] for row in plist)
-            users_interested_in.add(user_id)  # So that we receive our own presence
-
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-            users_interested_in.update(users_who_share_room)
-
-            if explicit_room_id:
-                user_ids = yield self.store.get_users_in_room(explicit_room_id)
-                users_interested_in.update(user_ids)
+            users_interested_in = yield self._get_interested_in(user, explicit_room_id)
 
             user_ids_changed = set()
             changed = None
@@ -1073,16 +1068,13 @@ class PresenceEventSource(object):
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
-        now = self.clock.time_msec()
-
-        defer.returnValue(([
-            {
-                "type": "m.presence",
-                "content": format_user_presence_state(s, now),
-            }
-            for s in updates.values()
-            if include_offline or s.state != PresenceState.OFFLINE
-        ], max_token))
+        if include_offline:
+            defer.returnValue((updates.values(), max_token))
+        else:
+            defer.returnValue(([
+                s for s in updates.itervalues()
+                if s.state != PresenceState.OFFLINE
+            ], max_token))
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
@@ -1090,6 +1082,31 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
+    @cachedInlineCallbacks(num_args=2, cache_context=True)
+    def _get_interested_in(self, user, explicit_room_id, cache_context):
+        """Returns the set of users that the given user should see presence
+        updates for
+        """
+        user_id = user.to_string()
+        plist = yield self.store.get_presence_list_accepted(
+            user.localpart, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in.add(user_id)  # So that we receive our own presence
+
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in.update(users_who_share_room)
+
+        if explicit_room_id:
+            user_ids = yield self.store.get_users_in_room(
+                explicit_room_id, on_invalidate=cache_context.invalidate,
+            )
+            users_interested_in.update(user_ids)
+
+        defer.returnValue(users_interested_in)
+
 
 def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as