summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/device.py38
-rw-r--r--synapse/storage/event_federation.py17
-rw-r--r--synapse/storage/state.py14
3 files changed, 58 insertions, 11 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 8cb47ac417..ca7137f315 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from synapse.api import errors
 from synapse.api.constants import EventTypes
 from synapse.util import stringutils
@@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler):
         # Then work out if any users have since joined
         rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
 
+        stream_ordering = RoomStreamToken.parse_stream_token(
+            from_token.room_key).stream
+
         possibly_changed = set(changed)
         for room_id in rooms_changed:
-            # Fetch  the current state at the time.
-            stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
-
+            # Fetch the current state at the time.
             try:
                 event_ids = yield self.store.get_forward_extremeties_for_room(
                     room_id, stream_ordering=stream_ordering
                 )
-                prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
-            except:
-                prev_state_ids = {}
+            except errors.StoreError:
+                # we have purged the stream_ordering index since the stream
+                # ordering: treat it the same as a new room
+                event_ids = []
 
             current_state_ids = yield self.state.get_current_state_ids(room_id)
 
+            # special-case for an empty prev state: include all members
+            # in the changed list
+            if not event_ids:
+                for key, event_id in current_state_ids.iteritems():
+                    etype, state_key = key
+                    if etype != EventTypes.Member:
+                        continue
+                    possibly_changed.add(state_key)
+                continue
+
+            # mapping from event_id -> state_dict
+            prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
             # If there has been any change in membership, include them in the
             # possibly changed list. We'll check if they are joined below,
             # and we're not toooo worried about spuriously adding users.
             for key, event_id in current_state_ids.iteritems():
                 etype, state_key = key
-                if etype == EventTypes.Member:
-                    prev_event_id = prev_state_ids.get(key, None)
+                if etype != EventTypes.Member:
+                    continue
+
+                # check if this member has changed since any of the extremities
+                # at the stream_ordering, and add them to the list if so.
+                for state_dict in prev_state_ids.values():
+                    prev_event_id = state_dict.get(key, None)
                     if not prev_event_id or prev_event_id != event_id:
                         possibly_changed.add(state_key)
+                        break
 
         users_who_share_room = yield self.store.get_users_who_share_room_with_user(
             user_id
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index ee88c61954..256e50dc20 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore):
         )
 
     def get_forward_extremeties_for_room(self, room_id, stream_ordering):
+        """For a given room_id and stream_ordering, return the forward
+        extremeties of the room at that point in "time".
+
+        Throws a StoreError if we have since purged the index for
+        stream_orderings from that point.
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+
+        Returns:
+            deferred, which resolves to a list of event_ids
+        """
         # We want to make the cache more effective, so we clamp to the last
         # change before the given ordering.
         last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
 
         # We don't always have a full stream_to_exterm_id table, e.g. after
         # the upgrade that introduced it, so we make sure we never ask for a
-        # try and pin to a stream_ordering from before a restart
+        # stream_ordering from before a restart
         last_change = max(self._stream_order_on_start, last_change)
 
+        # provided the last_change is recent enough, we now clamp the requested
+        # stream_ordering to it.
         if last_change > self.stream_ordering_month_ago:
             stream_ordering = min(last_change, stream_ordering)
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 1b3800eb6a..84482d8285 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -413,7 +413,19 @@ class StateStore(SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_ids_for_events(self, event_ids, types):
+    def get_state_ids_for_events(self, event_ids, types=None):
+        """
+        Get the state dicts corresponding to a list of events
+
+        Args:
+            event_ids(list(str)): events whose state should be returned
+            types(list[(str, str)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. May be None, which
+                matches any key
+
+        Returns:
+            A deferred dict from event_id -> (type, state_key) -> state_event
+        """
         event_to_groups = yield self._get_state_group_for_events(
             event_ids,
         )