diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/presence.py | 35 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 25 |
2 files changed, 49 insertions, 11 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 73752b2f89..d22adadc38 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -191,6 +191,13 @@ class PresenceHandler(object): 5000, ) + self.clock.call_later( + 60, + self.clock.looping_call, + self._persist_unpersisted_changes, + 60 * 1000, + ) + metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer)) @defer.inlineCallbacks @@ -217,6 +224,27 @@ class PresenceHandler(object): logger.info("Finished _on_shutdown") @defer.inlineCallbacks + def _persist_unpersisted_changes(self): + """We periodically persist the unpersisted changes, as otherwise they + may stack up and slow down shutdown times. + """ + logger.info( + "Performing _persist_unpersisted_changes. Persiting %d unpersisted changes", + len(self.user_to_current_state) + ) + + unpersisted = self.unpersisted_users_changes + self.unpersisted_users_changes = set() + + if unpersisted: + yield self.store.update_presence([ + self.user_to_current_state[user_id] + for user_id in unpersisted + ]) + + logger.info("Finished _persist_unpersisted_changes") + + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state @@ -922,7 +950,12 @@ def should_notify(old_state, new_state): if new_state.currently_active != old_state.currently_active: return True - if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: + if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: + # Only notify about last active bumps if we're not currently acive + if not (old_state.currently_active and new_state.currently_active): + return True + + elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. return True diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91934b0c81..14f2032afa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -565,21 +565,26 @@ class SyncHandler(object): if sync_result_builder.since_token is not None: since_stream_id = int(sync_result_builder.since_token.to_device_key) - if since_stream_id: + if since_stream_id != int(now_token.to_device_key): + # We only delete messages when a new message comes in, but that's + # fine so long as we delete them at some point. + logger.debug("Deleting messages up to %d", since_stream_id) yield self.store.delete_messages_for_device( user_id, device_id, since_stream_id ) - logger.debug("Getting messages up to %d", now_token.to_device_key) - messages, stream_id = yield self.store.get_new_messages_for_device( - user_id, device_id, now_token.to_device_key - ) - logger.debug("Got messages up to %d: %r", stream_id, messages) - sync_result_builder.now_token = now_token.copy_and_replace( - "to_device_key", stream_id - ) - sync_result_builder.to_device = messages + logger.debug("Getting messages up to %d", now_token.to_device_key) + messages, stream_id = yield self.store.get_new_messages_for_device( + user_id, device_id, now_token.to_device_key + ) + logger.debug("Got messages up to %d: %r", stream_id, messages) + sync_result_builder.now_token = now_token.copy_and_replace( + "to_device_key", stream_id + ) + sync_result_builder.to_device = messages + else: + sync_result_builder.to_device = [] @defer.inlineCallbacks def _generate_sync_entry_for_account_data(self, sync_result_builder): |