summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/presence.py131
-rw-r--r--synapse/handlers/sync.py16
2 files changed, 84 insertions, 63 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 4197311a97..aed640450f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,6 +31,7 @@ from synapse.storage.presence import UserPresenceState
 
 from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 from synapse.types import UserID
 import synapse.metrics
@@ -209,57 +210,60 @@ class PresenceHandler(BaseHandler):
         """
         now = self.clock.time_msec()
 
-        # NOTE: We purposefully don't yield between now and when we've
-        # calculated what we want to do with the new states, to avoid races.
+        with Measure(self.clock, "presence_update_states"):
 
-        to_notify = {}  # Changes we want to notify everyone about
-        to_federation_ping = {}  # These need sending keep-alives
-        for new_state in new_states:
-            user_id = new_state.user_id
+            # NOTE: We purposefully don't yield between now and when we've
+            # calculated what we want to do with the new states, to avoid races.
 
-            # Its fine to not hit the database here, as the only thing not in
-            # the current state cache are OFFLINE states, where the only field
-            # of interest is last_active which is safe enough to assume is 0
-            # here.
-            prev_state = self.user_to_current_state.get(
-                user_id, UserPresenceState.default(user_id)
-            )
+            to_notify = {}  # Changes we want to notify everyone about
+            to_federation_ping = {}  # These need sending keep-alives
 
-            new_state, should_notify, should_ping = handle_update(
-                prev_state, new_state,
-                is_mine=self.hs.is_mine_id(user_id),
-                wheel_timer=self.wheel_timer,
-                now=now
-            )
+            for new_state in new_states:
+                user_id = new_state.user_id
 
-            self.user_to_current_state[user_id] = new_state
+                # Its fine to not hit the database here, as the only thing not in
+                # the current state cache are OFFLINE states, where the only field
+                # of interest is last_active which is safe enough to assume is 0
+                # here.
+                prev_state = self.user_to_current_state.get(
+                    user_id, UserPresenceState.default(user_id)
+                )
 
-            if should_notify:
-                to_notify[user_id] = new_state
-            elif should_ping:
-                to_federation_ping[user_id] = new_state
+                new_state, should_notify, should_ping = handle_update(
+                    prev_state, new_state,
+                    is_mine=self.hs.is_mine_id(user_id),
+                    wheel_timer=self.wheel_timer,
+                    now=now
+                )
 
-        # TODO: We should probably ensure there are no races hereafter
+                self.user_to_current_state[user_id] = new_state
 
-        presence_updates_counter.inc_by(len(new_states))
+                if should_notify:
+                    to_notify[user_id] = new_state
+                elif should_ping:
+                    to_federation_ping[user_id] = new_state
 
-        if to_notify:
-            notified_presence_counter.inc_by(len(to_notify))
-            yield self._persist_and_notify(to_notify.values())
+            # TODO: We should probably ensure there are no races hereafter
 
-        self.unpersisted_users_changes |= set(s.user_id for s in new_states)
-        self.unpersisted_users_changes -= set(to_notify.keys())
+            presence_updates_counter.inc_by(len(new_states))
 
-        to_federation_ping = {
-            user_id: state for user_id, state in to_federation_ping.items()
-            if user_id not in to_notify
-        }
-        if to_federation_ping:
-            _, _, hosts_to_states = yield self._get_interested_parties(
-                to_federation_ping.values()
-            )
+            if to_notify:
+                notified_presence_counter.inc_by(len(to_notify))
+                yield self._persist_and_notify(to_notify.values())
+
+            self.unpersisted_users_changes |= set(s.user_id for s in new_states)
+            self.unpersisted_users_changes -= set(to_notify.keys())
+
+            to_federation_ping = {
+                user_id: state for user_id, state in to_federation_ping.items()
+                if user_id not in to_notify
+            }
+            if to_federation_ping:
+                _, _, hosts_to_states = yield self._get_interested_parties(
+                    to_federation_ping.values()
+                )
 
-            self._push_to_remotes(hosts_to_states)
+                self._push_to_remotes(hosts_to_states)
 
     def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -267,26 +271,27 @@ class PresenceHandler(BaseHandler):
         """
         now = self.clock.time_msec()
 
-        # Fetch the list of users that *may* have timed out. Things may have
-        # changed since the timeout was set, so we won't necessarily have to
-        # take any action.
-        users_to_check = self.wheel_timer.fetch(now)
+        with Measure(self.clock, "presence_handle_timeouts"):
+            # Fetch the list of users that *may* have timed out. Things may have
+            # changed since the timeout was set, so we won't necessarily have to
+            # take any action.
+            users_to_check = self.wheel_timer.fetch(now)
 
-        states = [
-            self.user_to_current_state.get(
-                user_id, UserPresenceState.default(user_id)
-            )
-            for user_id in set(users_to_check)
-        ]
+            states = [
+                self.user_to_current_state.get(
+                    user_id, UserPresenceState.default(user_id)
+                )
+                for user_id in set(users_to_check)
+            ]
 
-        timers_fired_counter.inc_by(len(states))
+            timers_fired_counter.inc_by(len(states))
 
-        changes = handle_timeouts(
-            states,
-            is_mine_fn=self.hs.is_mine_id,
-            user_to_num_current_syncs=self.user_to_num_current_syncs,
-            now=now,
-        )
+            changes = handle_timeouts(
+                states,
+                is_mine_fn=self.hs.is_mine_id,
+                user_to_num_current_syncs=self.user_to_num_current_syncs,
+                now=now,
+            )
 
         preserve_fn(self._update_states)(changes)
 
@@ -423,13 +428,21 @@ class PresenceHandler(BaseHandler):
 
         hosts_to_states = {}
         for room_id, states in room_ids_to_states.items():
+            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
             hosts = yield self.store.get_joined_hosts_for_room(room_id)
             for host in hosts:
-                hosts_to_states.setdefault(host, []).extend(states)
+                hosts_to_states.setdefault(host, []).extend(local_states)
 
         for user_id, states in users_to_states.items():
+            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
             host = UserID.from_string(user_id).domain
-            hosts_to_states.setdefault(host, []).extend(states)
+            hosts_to_states.setdefault(host, []).extend(local_states)
 
         # TODO: de-dup hosts_to_states, as a single host might have multiple
         # of same presence
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 75e4ffb3ab..c87ff75c05 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -843,16 +843,20 @@ class SyncHandler(BaseHandler):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
-            current_state = yield self.get_state_at(
-                room_id, stream_position=now_token
-            )
-
             if full_state:
                 if batch:
+                    current_state = yield self.store.get_state_for_event(
+                        batch.events[-1].event_id
+                    )
+
                     state = yield self.store.get_state_for_event(
                         batch.events[0].event_id
                     )
                 else:
+                    current_state = yield self.get_state_at(
+                        room_id, stream_position=now_token
+                    )
+
                     state = current_state
 
                 timeline_state = {
@@ -871,6 +875,10 @@ class SyncHandler(BaseHandler):
                     room_id, stream_position=since_token
                 )
 
+                current_state = yield self.store.get_state_for_event(
+                    batch.events[-1].event_id
+                )
+
                 state_at_timeline_start = yield self.store.get_state_for_event(
                     batch.events[0].event_id
                 )