summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-03-08 11:44:20 +0000
committerErik Johnston <erik@matrix.org>2019-03-08 11:44:20 +0000
commit8c4896668fd602e2822dfc48e992ed48bbc5adf7 (patch)
tree99c0052e1697e74379050ad7af4cef2d62f21c2e /synapse/handlers
parentFactor out soft fail checks (diff)
parentMerge pull request #4829 from matrix-org/erikj/device_list_seen_updates (diff)
downloadsynapse-8c4896668fd602e2822dfc48e992ed48bbc5adf7.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/soft_fail_impl
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/device.py50
-rw-r--r--synapse/handlers/register.py2
-rw-r--r--synapse/handlers/sync.py86
-rw-r--r--synapse/handlers/user_directory.py222
4 files changed, 155 insertions, 205 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c09a7c6280..b398848079 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -402,6 +402,12 @@ class DeviceHandler(DeviceWorkerHandler):
             user_id, device_ids, list(hosts)
         )
 
+        for device_id in device_ids:
+            logger.debug(
+                "Notifying about update %r/%r, ID: %r", user_id, device_id,
+                position,
+            )
+
         room_ids = yield self.store.get_rooms_for_user(user_id)
 
         yield self.notifier.on_new_event(
@@ -409,7 +415,7 @@ class DeviceHandler(DeviceWorkerHandler):
         )
 
         if hosts:
-            logger.info("Sending device list update notif to: %r", hosts)
+            logger.info("Sending device list update notif for %r to: %r", user_id, hosts)
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
 
@@ -479,15 +485,26 @@ class DeviceListEduUpdater(object):
 
         if get_domain_from_id(user_id) != origin:
             # TODO: Raise?
-            logger.warning("Got device list update edu for %r from %r", user_id, origin)
+            logger.warning(
+                "Got device list update edu for %r/%r from %r",
+                user_id, device_id, origin,
+            )
             return
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
+            logger.warning(
+                "Got device list update edu for %r/%r, but don't share a room",
+                user_id, device_id,
+            )
             return
 
+        logger.debug(
+            "Received device list update for %r/%r", user_id, device_id,
+        )
+
         self._pending_updates.setdefault(user_id, []).append(
             (device_id, stream_id, prev_ids, edu_content)
         )
@@ -505,10 +522,18 @@ class DeviceListEduUpdater(object):
                 # This can happen since we batch updates
                 return
 
+            for device_id, stream_id, prev_ids, content in pending_updates:
+                logger.debug(
+                    "Handling update %r/%r, ID: %r, prev: %r ",
+                    user_id, device_id, stream_id, prev_ids,
+                )
+
             # Given a list of updates we check if we need to resync. This
             # happens if we've missed updates.
             resync = yield self._need_to_do_resync(user_id, pending_updates)
 
+            logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
+
             if resync:
                 # Fetch all devices for the user.
                 origin = get_domain_from_id(user_id)
@@ -561,11 +586,21 @@ class DeviceListEduUpdater(object):
                     )
                     devices = []
 
+                for device in devices:
+                    logger.debug(
+                        "Handling resync update %r/%r, ID: %r",
+                        user_id, device["device_id"], stream_id,
+                    )
+
                 yield self.store.update_remote_device_list_cache(
                     user_id, devices, stream_id,
                 )
                 device_ids = [device["device_id"] for device in devices]
                 yield self.device_handler.notify_device_update(user_id, device_ids)
+
+                # We clobber the seen updates since we've re-synced from a given
+                # point.
+                self._seen_updates[user_id] = set([stream_id])
             else:
                 # Simply update the single device, since we know that is the only
                 # change (because of the single prev_id matching the current cache)
@@ -578,9 +613,9 @@ class DeviceListEduUpdater(object):
                     user_id, [device_id for device_id, _, _, _ in pending_updates]
                 )
 
-            self._seen_updates.setdefault(user_id, set()).update(
-                stream_id for _, stream_id, _, _ in pending_updates
-            )
+                self._seen_updates.setdefault(user_id, set()).update(
+                    stream_id for _, stream_id, _, _ in pending_updates
+                )
 
     @defer.inlineCallbacks
     def _need_to_do_resync(self, user_id, updates):
@@ -593,6 +628,11 @@ class DeviceListEduUpdater(object):
             user_id
         )
 
+        logger.debug(
+            "Current extremity for %r: %r",
+            user_id, extremity,
+        )
+
         stream_id_in_updates = set()  # stream_ids in updates list
         for _, stream_id, prev_ids, _ in updates:
             if not prev_ids:
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 47d5e276f8..03130edc54 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -61,7 +61,7 @@ class RegistrationHandler(BaseHandler):
         self.user_directory_handler = hs.get_user_directory_handler()
         self.captcha_client = CaptchaServerHttpClient(hs)
         self.identity_handler = self.hs.get_handlers().identity_handler
-        self.ratelimiter = hs.get_ratelimiter()
+        self.ratelimiter = hs.get_registration_ratelimiter()
 
         self._next_generated_user_id = None
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd97241ab4..57bb996245 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Debug logger for https://github.com/matrix-org/synapse/issues/4422
+issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+
 
 # Counts the number of times we returned a non-empty sync. `type` is one of
 # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -962,6 +965,15 @@ class SyncHandler(object):
 
         yield self._generate_sync_entry_for_groups(sync_result_builder)
 
+        # debug for https://github.com/matrix-org/synapse/issues/4422
+        for joined_room in sync_result_builder.joined:
+            room_id = joined_room.room_id
+            if room_id in newly_joined_rooms:
+                issue4422_logger.debug(
+                    "Sync result for newly joined room %s: %r",
+                    room_id, joined_room,
+                )
+
         defer.returnValue(SyncResult(
             presence=sync_result_builder.presence,
             account_data=sync_result_builder.account_data,
@@ -1425,6 +1437,17 @@ class SyncHandler(object):
                     old_mem_ev = yield self.store.get_event(
                         old_mem_ev_id, allow_none=True
                     )
+
+                # debug for #4422
+                if has_join:
+                    prev_membership = None
+                    if old_mem_ev:
+                        prev_membership = old_mem_ev.membership
+                    issue4422_logger.debug(
+                        "Previous membership for room %s with join: %s (event %s)",
+                        room_id, prev_membership, old_mem_ev_id,
+                    )
+
                 if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
                     newly_joined_rooms.append(room_id)
 
@@ -1519,30 +1542,39 @@ class SyncHandler(object):
         for room_id in sync_result_builder.joined_room_ids:
             room_entry = room_to_events.get(room_id, None)
 
+            newly_joined = room_id in newly_joined_rooms
             if room_entry:
                 events, start_key = room_entry
 
                 prev_batch_token = now_token.copy_and_replace("room_key", start_key)
 
-                room_entries.append(RoomSyncResultBuilder(
+                entry = RoomSyncResultBuilder(
                     room_id=room_id,
                     rtype="joined",
                     events=events,
-                    newly_joined=room_id in newly_joined_rooms,
+                    newly_joined=newly_joined,
                     full_state=False,
-                    since_token=None if room_id in newly_joined_rooms else since_token,
+                    since_token=None if newly_joined else since_token,
                     upto_token=prev_batch_token,
-                ))
+                )
             else:
-                room_entries.append(RoomSyncResultBuilder(
+                entry = RoomSyncResultBuilder(
                     room_id=room_id,
                     rtype="joined",
                     events=[],
-                    newly_joined=room_id in newly_joined_rooms,
+                    newly_joined=newly_joined,
                     full_state=False,
                     since_token=since_token,
                     upto_token=since_token,
-                ))
+                )
+
+            if newly_joined:
+                # debugging for https://github.com/matrix-org/synapse/issues/4422
+                issue4422_logger.debug(
+                    "RoomSyncResultBuilder events for newly joined room %s: %r",
+                    room_id, entry.events,
+                )
+            room_entries.append(entry)
 
         defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
 
@@ -1663,6 +1695,13 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
+        if newly_joined:
+            # debug for https://github.com/matrix-org/synapse/issues/4422
+            issue4422_logger.debug(
+                "Timeline events after filtering in newly-joined room %s: %r",
+                room_id, batch,
+            )
+
         # When we join the room (or the client requests full_state), we should
         # send down any existing tags. Usually the user won't have tags in a
         # newly joined room, unless either a) they've joined before or b) the
@@ -1894,15 +1933,34 @@ def _calculate_state(
 
 
 class SyncResultBuilder(object):
-    "Used to help build up a new SyncResult for a user"
+    """Used to help build up a new SyncResult for a user
+
+    Attributes:
+        sync_config (SyncConfig)
+        full_state (bool)
+        since_token (StreamToken)
+        now_token (StreamToken)
+        joined_room_ids (list[str])
+
+        # The following mirror the fields in a sync response
+        presence (list)
+        account_data (list)
+        joined (list[JoinedSyncResult])
+        invited (list[InvitedSyncResult])
+        archived (list[ArchivedSyncResult])
+        device (list)
+        groups (GroupsSyncResult|None)
+        to_device (list)
+    """
     def __init__(self, sync_config, full_state, since_token, now_token,
                  joined_room_ids):
         """
         Args:
-            sync_config(SyncConfig)
-            full_state(bool): The full_state flag as specified by user
-            since_token(StreamToken): The token supplied by user, or None.
-            now_token(StreamToken): The token to sync up to.
+            sync_config (SyncConfig)
+            full_state (bool): The full_state flag as specified by user
+            since_token (StreamToken): The token supplied by user, or None.
+            now_token (StreamToken): The token to sync up to.
+            joined_room_ids (list[str]): List of rooms the user is joined to
         """
         self.sync_config = sync_config
         self.full_state = full_state
@@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object):
         Args:
             room_id(str)
             rtype(str): One of `"joined"` or `"archived"`
-            events(list): List of events to include in the room, (more events
-                may be added when generating result).
+            events(list[FrozenEvent]): List of events to include in the room
+                (more events may be added when generating result).
             newly_joined(bool): If the user has newly joined the room
             full_state(bool): Whether the full state should be sent in result
             since_token(StreamToken): Earliest point to return events from, or None
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..c21da8343a 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,7 +15,7 @@
 
 import logging
 
-from six import iteritems
+from six import iteritems, iterkeys
 
 from twisted.internet import defer
 
@@ -63,10 +63,6 @@ class UserDirectoryHandler(object):
         # When start up for the first time we need to populate the user_directory.
         # This is a set of user_id's we've inserted already
         self.initially_handled_users = set()
-        self.initially_handled_users_in_public = set()
-
-        self.initially_handled_users_share = set()
-        self.initially_handled_users_share_private_room = set()
 
         # The current position in the current_state_delta stream
         self.pos = None
@@ -140,7 +136,6 @@ class UserDirectoryHandler(object):
         # FIXME(#3714): We should probably do this in the same worker as all
         # the other changes.
         yield self.store.remove_from_user_dir(user_id)
-        yield self.store.remove_from_user_in_public_room(user_id)
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
@@ -215,15 +210,13 @@ class UserDirectoryHandler(object):
             logger.info("Processed all users")
 
         self.initially_handled_users = None
-        self.initially_handled_users_in_public = None
-        self.initially_handled_users_share = None
-        self.initially_handled_users_share_private_room = None
 
         yield self.store.update_user_directory_stream_pos(new_pos)
 
     @defer.inlineCallbacks
     def _handle_initial_room(self, room_id):
-        """Called when we initially fill out user_directory one room at a time
+        """
+        Called when we initially fill out user_directory one room at a time
         """
         is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
         if not is_in_room:
@@ -238,23 +231,15 @@ class UserDirectoryHandler(object):
         unhandled_users = user_ids - self.initially_handled_users
 
         yield self.store.add_profiles_to_user_dir(
-            room_id,
             {user_id: users_with_profile[user_id] for user_id in unhandled_users},
         )
 
         self.initially_handled_users |= unhandled_users
 
-        if is_public:
-            yield self.store.add_users_to_public_room(
-                room_id, user_ids=user_ids - self.initially_handled_users_in_public
-            )
-            self.initially_handled_users_in_public |= user_ids
-
         # We now go and figure out the new users who share rooms with user entries
         # We sleep aggressively here as otherwise it can starve resources.
         # We also batch up inserts/updates, but try to avoid too many at once.
         to_insert = set()
-        to_update = set()
         count = 0
         for user_id in user_ids:
             if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
@@ -277,21 +262,7 @@ class UserDirectoryHandler(object):
                 count += 1
 
                 user_set = (user_id, other_user_id)
-
-                if user_set in self.initially_handled_users_share_private_room:
-                    continue
-
-                if user_set in self.initially_handled_users_share:
-                    if is_public:
-                        continue
-                    to_update.add(user_set)
-                else:
-                    to_insert.add(user_set)
-
-                if is_public:
-                    self.initially_handled_users_share.add(user_set)
-                else:
-                    self.initially_handled_users_share_private_room.add(user_set)
+                to_insert.add(user_set)
 
                 if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
                     yield self.store.add_users_who_share_room(
@@ -299,22 +270,10 @@ class UserDirectoryHandler(object):
                     )
                     to_insert.clear()
 
-                if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
-                    yield self.store.update_users_who_share_room(
-                        room_id, not is_public, to_update
-                    )
-                    to_update.clear()
-
         if to_insert:
             yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
             to_insert.clear()
 
-        if to_update:
-            yield self.store.update_users_who_share_room(
-                room_id, not is_public, to_update
-            )
-            to_update.clear()
-
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process
@@ -356,6 +315,7 @@ class UserDirectoryHandler(object):
                         user_ids = yield self.store.get_users_in_dir_due_to_room(
                             room_id
                         )
+
                         for user_id in user_ids:
                             yield self._handle_remove_user(room_id, user_id)
                         return
@@ -436,14 +396,20 @@ class UserDirectoryHandler(object):
             # ignore the change
             return
 
-        if change:
-            users_with_profile = yield self.state.get_current_user_in_room(room_id)
-            for user_id, profile in iteritems(users_with_profile):
-                yield self._handle_new_user(room_id, user_id, profile)
-        else:
-            users = yield self.store.get_users_in_public_due_to_room(room_id)
-            for user_id in users:
-                yield self._handle_remove_user(room_id, user_id)
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+        # Remove every user from the sharing tables for that room.
+        for user_id in iterkeys(users_with_profile):
+            yield self.store.remove_user_who_share_room(user_id, room_id)
+
+        # Then, re-add them to the tables.
+        # NOTE: this is not the most efficient method, as handle_new_user sets
+        # up local_user -> other_user and other_user_whos_local -> local_user,
+        # which when ran over an entire room, will result in the same values
+        # being added multiple times. The batching upserts shouldn't make this
+        # too bad, though.
+        for user_id, profile in iteritems(users_with_profile):
+            yield self._handle_new_user(room_id, user_id, profile)
 
     @defer.inlineCallbacks
     def _handle_local_user(self, user_id):
@@ -457,7 +423,7 @@ class UserDirectoryHandler(object):
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
-            yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+            yield self.store.add_profiles_to_user_dir({user_id: profile})
 
     @defer.inlineCallbacks
     def _handle_new_user(self, room_id, user_id, profile):
@@ -471,55 +437,27 @@ class UserDirectoryHandler(object):
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
-            yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+            yield self.store.add_profiles_to_user_dir({user_id: profile})
 
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
             room_id
         )
-
-        if is_public:
-            row = yield self.store.get_user_in_public_room(user_id)
-            if not row:
-                yield self.store.add_users_to_public_room(room_id, [user_id])
-        else:
-            logger.debug("Not adding new user to public dir, %r", user_id)
-
-        # Now we update users who share rooms with users. We do this by getting
-        # all the current users in the room and seeing which aren't already
-        # marked in the database as sharing with `user_id`
-
+        # Now we update users who share rooms with users.
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
         to_insert = set()
-        to_update = set()
-
-        is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
 
         # First, if they're our user then we need to update for every user
-        if self.is_mine_id(user_id) and not is_appservice:
-            # Returns a map of other_user_id -> shared_private. We only need
-            # to update mappings if for users that either don't share a room
-            # already (aren't in the map) or, if the room is private, those that
-            # only share a public room.
-            user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
-                user_id
-            )
+        if self.is_mine_id(user_id):
 
-            for other_user_id in users_with_profile:
-                if user_id == other_user_id:
-                    continue
+            is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+
+            # We don't care about appservice users.
+            if not is_appservice:
+                for other_user_id in users_with_profile:
+                    if user_id == other_user_id:
+                        continue
 
-                shared_is_private = user_ids_shared.get(other_user_id)
-                if shared_is_private is True:
-                    # We've already marked in the database they share a private room
-                    continue
-                elif shared_is_private is False:
-                    # They already share a public room, so only update if this is
-                    # a private room
-                    if not is_public:
-                        to_update.add((user_id, other_user_id))
-                elif shared_is_private is None:
-                    # This is the first time they both share a room
                     to_insert.add((user_id, other_user_id))
 
         # Next we need to update for every local user in the room
@@ -531,29 +469,11 @@ class UserDirectoryHandler(object):
                 other_user_id
             )
             if self.is_mine_id(other_user_id) and not is_appservice:
-                shared_is_private = yield self.store.get_if_users_share_a_room(
-                    other_user_id, user_id
-                )
-                if shared_is_private is True:
-                    # We've already marked in the database they share a private room
-                    continue
-                elif shared_is_private is False:
-                    # They already share a public room, so only update if this is
-                    # a private room
-                    if not is_public:
-                        to_update.add((other_user_id, user_id))
-                elif shared_is_private is None:
-                    # This is the first time they both share a room
-                    to_insert.add((other_user_id, user_id))
+                to_insert.add((other_user_id, user_id))
 
         if to_insert:
             yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
 
-        if to_update:
-            yield self.store.update_users_who_share_room(
-                room_id, not is_public, to_update
-            )
-
     @defer.inlineCallbacks
     def _handle_remove_user(self, room_id, user_id):
         """Called when we might need to remove user to directory
@@ -562,84 +482,16 @@ class UserDirectoryHandler(object):
             room_id (str): room_id that user left or stopped being public that
             user_id (str)
         """
-        logger.debug("Maybe removing user %r", user_id)
-
-        row = yield self.store.get_user_in_directory(user_id)
-        update_user_dir = row and row["room_id"] == room_id
-
-        row = yield self.store.get_user_in_public_room(user_id)
-        update_user_in_public = row and row["room_id"] == room_id
-
-        if update_user_in_public or update_user_dir:
-            # XXX: Make this faster?
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            for j_room_id in rooms:
-                if not update_user_in_public and not update_user_dir:
-                    break
-
-                is_in_room = yield self.store.is_host_joined(
-                    j_room_id, self.server_name
-                )
-
-                if not is_in_room:
-                    continue
-
-                if update_user_dir:
-                    update_user_dir = False
-                    yield self.store.update_user_in_user_dir(user_id, j_room_id)
+        logger.debug("Removing user %r", user_id)
 
-                is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-                    j_room_id
-                )
+        # Remove user from sharing tables
+        yield self.store.remove_user_who_share_room(user_id, room_id)
 
-                if update_user_in_public and is_public:
-                    yield self.store.update_user_in_public_user_list(user_id, j_room_id)
-                    update_user_in_public = False
+        # Are they still in a room with members? If not, remove them entirely.
+        users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id)
 
-        if update_user_dir:
+        if len(users_in_room_with) == 0:
             yield self.store.remove_from_user_dir(user_id)
-        elif update_user_in_public:
-            yield self.store.remove_from_user_in_public_room(user_id)
-
-        # Now handle users_who_share_rooms.
-
-        # Get a list of user tuples that were in the DB due to this room and
-        # users (this includes tuples where the other user matches `user_id`)
-        user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
-            user_id, room_id
-        )
-
-        for user_id, other_user_id in user_tuples:
-            # For each user tuple get a list of rooms that they still share,
-            # trying to find a private room, and update the entry in the DB
-            rooms = yield self.store.get_rooms_in_common_for_users(
-                user_id, other_user_id
-            )
-
-            # If they dont share a room anymore, remove the mapping
-            if not rooms:
-                yield self.store.remove_user_who_share_room(user_id, other_user_id)
-                continue
-
-            found_public_share = None
-            for j_room_id in rooms:
-                is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-                    j_room_id
-                )
-
-                if is_public:
-                    found_public_share = j_room_id
-                else:
-                    found_public_share = None
-                    yield self.store.update_users_who_share_room(
-                        room_id, not is_public, [(user_id, other_user_id)]
-                    )
-                    break
-
-            if found_public_share:
-                yield self.store.update_users_who_share_room(
-                    room_id, not is_public, [(user_id, other_user_id)]
-                )
 
     @defer.inlineCallbacks
     def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):