summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-09-02 10:59:24 +0100
committerMark Haines <mark.haines@matrix.org>2016-09-02 10:59:24 +0100
commit965168a842f62958bd9bbb33d2ef4678f2f48b27 (patch)
treee2cb7b2327f8de1f5aec03d0f91b461e09940541 /synapse/handlers
parentFix up the calls to the notifier for device messages (diff)
parentMerge pull request #1061 from matrix-org/erikj/linearize_resolution (diff)
downloadsynapse-965168a842f62958bd9bbb33d2ef4678f2f48b27.tar.xz
Merge branch 'develop' into markjh/direct_to_device_synchrotron
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py31
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/presence.py35
-rw-r--r--synapse/handlers/room_member.py6
-rw-r--r--synapse/handlers/sync.py25
5 files changed, 79 insertions, 28 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a7ea8fb98f..8e61d74b13 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -222,7 +222,7 @@ class FederationHandler(BaseHandler):
                 # joined the room. Don't bother if the user is just
                 # changing their profile info.
                 newly_joined = True
-                prev_state_id = context.current_state_ids.get(
+                prev_state_id = context.prev_state_ids.get(
                     (event.type, event.state_key)
                 )
                 if prev_state_id:
@@ -835,12 +835,12 @@ class FederationHandler(BaseHandler):
 
         self.replication_layer.send_pdu(new_pdu, destinations)
 
-        state_ids = context.current_state_ids.values()
+        state_ids = context.prev_state_ids.values()
         auth_chain = yield self.store.get_auth_chain(set(
             [event.event_id] + state_ids
         ))
 
-        state = yield self.store.get_events(context.current_state_ids.values())
+        state = yield self.store.get_events(context.prev_state_ids.values())
 
         defer.returnValue({
             "state": state.values(),
@@ -1333,7 +1333,7 @@ class FederationHandler(BaseHandler):
 
         if not auth_events:
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.current_state_ids, for_verification=True,
+                event, context.prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -1432,6 +1432,11 @@ class FederationHandler(BaseHandler):
         current_state = set(e.event_id for e in auth_events.values())
         event_auth_events = set(e_id for e_id, _ in event.auth_events)
 
+        if event.is_state():
+            event_key = (event.type, event.state_key)
+        else:
+            event_key = None
+
         if event_auth_events - current_state:
             have_events = yield self.store.have_events(
                 event_auth_events - current_state
@@ -1537,8 +1542,12 @@ class FederationHandler(BaseHandler):
 
                 context.current_state_ids.update({
                     k: a.event_id for k, a in auth_events.items()
+                    if k != event_key
+                })
+                context.prev_state_ids.update({
+                    k: a.event_id for k, a in auth_events.items()
                 })
-                context.state_group = None
+                context.state_group = self.store.get_next_state_group()
 
         if different_auth and not event.internal_metadata.is_outlier():
             logger.info("Different auth after resolution: %s", different_auth)
@@ -1560,7 +1569,7 @@ class FederationHandler(BaseHandler):
             if do_resolution:
                 # 1. Get what we think is the auth chain.
                 auth_ids = yield self.auth.compute_auth_events(
-                    event, context.current_state_ids
+                    event, context.prev_state_ids
                 )
                 local_auth_chain = yield self.store.get_auth_chain(auth_ids)
 
@@ -1618,8 +1627,12 @@ class FederationHandler(BaseHandler):
 
                 context.current_state_ids.update({
                     k: a.event_id for k, a in auth_events.items()
+                    if k != event_key
+                })
+                context.prev_state_ids.update({
+                    k: a.event_id for k, a in auth_events.items()
                 })
-                context.state_group = None
+                context.state_group = self.store.get_next_state_group()
 
         try:
             self.auth.check(event, auth_events=auth_events)
@@ -1855,7 +1868,7 @@ class FederationHandler(BaseHandler):
             event.content["third_party_invite"]["signed"]["token"]
         )
         original_invite = None
-        original_invite_id = context.current_state_ids.get(key)
+        original_invite_id = context.prev_state_ids.get(key)
         if original_invite_id:
             original_invite = yield self.store.get_event(
                 original_invite_id, allow_none=True
@@ -1893,7 +1906,7 @@ class FederationHandler(BaseHandler):
         signed = event.content["third_party_invite"]["signed"]
         token = signed["token"]
 
-        invite_event_id = context.current_state_ids.get(
+        invite_event_id = context.prev_state_ids.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e2f4387f60..3577db0595 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -272,7 +272,7 @@ class MessageHandler(BaseHandler):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.current_state_ids.get((event.type, event.state_key))
+        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -808,8 +808,8 @@ class MessageHandler(BaseHandler):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with current state: %s",
-            event.event_id, context.current_state_ids,
+            "Created event %s with state: %s",
+            event.event_id, context.prev_state_ids,
         )
 
         defer.returnValue(
@@ -904,7 +904,7 @@ class MessageHandler(BaseHandler):
 
         if event.type == EventTypes.Redaction:
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.current_state_ids, for_verification=True,
+                event, context.prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -924,7 +924,7 @@ class MessageHandler(BaseHandler):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.current_state_ids:
+        if event.type == EventTypes.Create and context.prev_state_ids:
             raise AuthError(
                 403,
                 "Changing the room create event is forbidden",
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 73752b2f89..cf82a2336e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -191,6 +191,13 @@ class PresenceHandler(object):
             5000,
         )
 
+        self.clock.call_later(
+            60,
+            self.clock.looping_call,
+            self._persist_unpersisted_changes,
+            60 * 1000,
+        )
+
         metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
 
     @defer.inlineCallbacks
@@ -217,6 +224,27 @@ class PresenceHandler(object):
         logger.info("Finished _on_shutdown")
 
     @defer.inlineCallbacks
+    def _persist_unpersisted_changes(self):
+        """We periodically persist the unpersisted changes, as otherwise they
+        may stack up and slow down shutdown times.
+        """
+        logger.info(
+            "Performing _persist_unpersisted_changes. Persiting %d unpersisted changes",
+            len(self.unpersisted_users_changes)
+        )
+
+        unpersisted = self.unpersisted_users_changes
+        self.unpersisted_users_changes = set()
+
+        if unpersisted:
+            yield self.store.update_presence([
+                self.user_to_current_state[user_id]
+                for user_id in unpersisted
+            ])
+
+        logger.info("Finished _persist_unpersisted_changes")
+
+    @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
         the notifier and federation if and only if the changed presence state
@@ -922,7 +950,12 @@ def should_notify(old_state, new_state):
         if new_state.currently_active != old_state.currently_active:
             return True
 
-    if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+        if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+            # Only notify about last active bumps if we're not currently acive
+            if not (old_state.currently_active and new_state.currently_active):
+                return True
+
+    elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
         # Always notify for a transition where last active gets bumped.
         return True
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index dd4b90ee24..3ba5335af7 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -93,7 +93,7 @@ class RoomMemberHandler(BaseHandler):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.current_state_ids.get(
+        prev_member_event_id = context.prev_state_ids.get(
             (EventTypes.Member, target.to_string()),
             None
         )
@@ -341,7 +341,7 @@ class RoomMemberHandler(BaseHandler):
 
         if event.membership == Membership.JOIN:
             if requester.is_guest:
-                guest_can_join = yield self._can_guest_join(context.current_state_ids)
+                guest_can_join = yield self._can_guest_join(context.prev_state_ids)
                 if not guest_can_join:
                     # This should be an auth check, but guests are a local concept,
                     # so don't really fit into the general auth process.
@@ -355,7 +355,7 @@ class RoomMemberHandler(BaseHandler):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.current_state_ids.get(
+        prev_member_event_id = context.prev_state_ids.get(
             (EventTypes.Member, event.state_key),
             None
         )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 91934b0c81..14f2032afa 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -565,21 +565,26 @@ class SyncHandler(object):
         if sync_result_builder.since_token is not None:
             since_stream_id = int(sync_result_builder.since_token.to_device_key)
 
-        if since_stream_id:
+        if since_stream_id != int(now_token.to_device_key):
+            # We only delete messages when a new message comes in, but that's
+            # fine so long as we delete them at some point.
+
             logger.debug("Deleting messages up to %d", since_stream_id)
             yield self.store.delete_messages_for_device(
                 user_id, device_id, since_stream_id
             )
 
-        logger.debug("Getting messages up to %d", now_token.to_device_key)
-        messages, stream_id = yield self.store.get_new_messages_for_device(
-            user_id, device_id, now_token.to_device_key
-        )
-        logger.debug("Got messages up to %d: %r", stream_id, messages)
-        sync_result_builder.now_token = now_token.copy_and_replace(
-            "to_device_key", stream_id
-        )
-        sync_result_builder.to_device = messages
+            logger.debug("Getting messages up to %d", now_token.to_device_key)
+            messages, stream_id = yield self.store.get_new_messages_for_device(
+                user_id, device_id, now_token.to_device_key
+            )
+            logger.debug("Got messages up to %d: %r", stream_id, messages)
+            sync_result_builder.now_token = now_token.copy_and_replace(
+                "to_device_key", stream_id
+            )
+            sync_result_builder.to_device = messages
+        else:
+            sync_result_builder.to_device = []
 
     @defer.inlineCallbacks
     def _generate_sync_entry_for_account_data(self, sync_result_builder):