diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c09a7c6280..b398848079 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -402,6 +402,12 @@ class DeviceHandler(DeviceWorkerHandler):
user_id, device_ids, list(hosts)
)
+ for device_id in device_ids:
+ logger.debug(
+ "Notifying about update %r/%r, ID: %r", user_id, device_id,
+ position,
+ )
+
room_ids = yield self.store.get_rooms_for_user(user_id)
yield self.notifier.on_new_event(
@@ -409,7 +415,7 @@ class DeviceHandler(DeviceWorkerHandler):
)
if hosts:
- logger.info("Sending device list update notif to: %r", hosts)
+ logger.info("Sending device list update notif for %r to: %r", user_id, hosts)
for host in hosts:
self.federation_sender.send_device_messages(host)
@@ -479,15 +485,26 @@ class DeviceListEduUpdater(object):
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
- logger.warning("Got device list update edu for %r from %r", user_id, origin)
+ logger.warning(
+ "Got device list update edu for %r/%r from %r",
+ user_id, device_id, origin,
+ )
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
+ logger.warning(
+ "Got device list update edu for %r/%r, but don't share a room",
+ user_id, device_id,
+ )
return
+ logger.debug(
+ "Received device list update for %r/%r", user_id, device_id,
+ )
+
self._pending_updates.setdefault(user_id, []).append(
(device_id, stream_id, prev_ids, edu_content)
)
@@ -505,10 +522,18 @@ class DeviceListEduUpdater(object):
# This can happen since we batch updates
return
+ for device_id, stream_id, prev_ids, content in pending_updates:
+ logger.debug(
+ "Handling update %r/%r, ID: %r, prev: %r ",
+ user_id, device_id, stream_id, prev_ids,
+ )
+
# Given a list of updates we check if we need to resync. This
# happens if we've missed updates.
resync = yield self._need_to_do_resync(user_id, pending_updates)
+ logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
+
if resync:
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
@@ -561,11 +586,21 @@ class DeviceListEduUpdater(object):
)
devices = []
+ for device in devices:
+ logger.debug(
+ "Handling resync update %r/%r, ID: %r",
+ user_id, device["device_id"], stream_id,
+ )
+
yield self.store.update_remote_device_list_cache(
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
yield self.device_handler.notify_device_update(user_id, device_ids)
+
+ # We clobber the seen updates since we've re-synced from a given
+ # point.
+ self._seen_updates[user_id] = set([stream_id])
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
@@ -578,9 +613,9 @@ class DeviceListEduUpdater(object):
user_id, [device_id for device_id, _, _, _ in pending_updates]
)
- self._seen_updates.setdefault(user_id, set()).update(
- stream_id for _, stream_id, _, _ in pending_updates
- )
+ self._seen_updates.setdefault(user_id, set()).update(
+ stream_id for _, stream_id, _, _ in pending_updates
+ )
@defer.inlineCallbacks
def _need_to_do_resync(self, user_id, updates):
@@ -593,6 +628,11 @@ class DeviceListEduUpdater(object):
user_id
)
+ logger.debug(
+ "Current extremity for %r: %r",
+ user_id, extremity,
+ )
+
stream_id_in_updates = set() # stream_ids in updates list
for _, stream_id, prev_ids, _ in updates:
if not prev_ids:
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 47d5e276f8..03130edc54 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -61,7 +61,7 @@ class RegistrationHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self.identity_handler = self.hs.get_handlers().identity_handler
- self.ratelimiter = hs.get_ratelimiter()
+ self.ratelimiter = hs.get_registration_ratelimiter()
self._next_generated_user_id = None
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd97241ab4..57bb996245 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+# Debug logger for https://github.com/matrix-org/synapse/issues/4422
+issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -962,6 +965,15 @@ class SyncHandler(object):
yield self._generate_sync_entry_for_groups(sync_result_builder)
+ # debug for https://github.com/matrix-org/synapse/issues/4422
+ for joined_room in sync_result_builder.joined:
+ room_id = joined_room.room_id
+ if room_id in newly_joined_rooms:
+ issue4422_logger.debug(
+ "Sync result for newly joined room %s: %r",
+ room_id, joined_room,
+ )
+
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
@@ -1425,6 +1437,17 @@ class SyncHandler(object):
old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True
)
+
+ # debug for #4422
+ if has_join:
+ prev_membership = None
+ if old_mem_ev:
+ prev_membership = old_mem_ev.membership
+ issue4422_logger.debug(
+ "Previous membership for room %s with join: %s (event %s)",
+ room_id, prev_membership, old_mem_ev_id,
+ )
+
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
@@ -1519,30 +1542,39 @@ class SyncHandler(object):
for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
+ newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
- room_entries.append(RoomSyncResultBuilder(
+ entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=events,
- newly_joined=room_id in newly_joined_rooms,
+ newly_joined=newly_joined,
full_state=False,
- since_token=None if room_id in newly_joined_rooms else since_token,
+ since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
- ))
+ )
else:
- room_entries.append(RoomSyncResultBuilder(
+ entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=[],
- newly_joined=room_id in newly_joined_rooms,
+ newly_joined=newly_joined,
full_state=False,
since_token=since_token,
upto_token=since_token,
- ))
+ )
+
+ if newly_joined:
+ # debugging for https://github.com/matrix-org/synapse/issues/4422
+ issue4422_logger.debug(
+ "RoomSyncResultBuilder events for newly joined room %s: %r",
+ room_id, entry.events,
+ )
+ room_entries.append(entry)
defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
@@ -1663,6 +1695,13 @@ class SyncHandler(object):
newly_joined_room=newly_joined,
)
+ if newly_joined:
+ # debug for https://github.com/matrix-org/synapse/issues/4422
+ issue4422_logger.debug(
+ "Timeline events after filtering in newly-joined room %s: %r",
+ room_id, batch,
+ )
+
# When we join the room (or the client requests full_state), we should
# send down any existing tags. Usually the user won't have tags in a
# newly joined room, unless either a) they've joined before or b) the
@@ -1894,15 +1933,34 @@ def _calculate_state(
class SyncResultBuilder(object):
- "Used to help build up a new SyncResult for a user"
+ """Used to help build up a new SyncResult for a user
+
+ Attributes:
+ sync_config (SyncConfig)
+ full_state (bool)
+ since_token (StreamToken)
+ now_token (StreamToken)
+ joined_room_ids (list[str])
+
+ # The following mirror the fields in a sync response
+ presence (list)
+ account_data (list)
+ joined (list[JoinedSyncResult])
+ invited (list[InvitedSyncResult])
+ archived (list[ArchivedSyncResult])
+ device (list)
+ groups (GroupsSyncResult|None)
+ to_device (list)
+ """
def __init__(self, sync_config, full_state, since_token, now_token,
joined_room_ids):
"""
Args:
- sync_config(SyncConfig)
- full_state(bool): The full_state flag as specified by user
- since_token(StreamToken): The token supplied by user, or None.
- now_token(StreamToken): The token to sync up to.
+ sync_config (SyncConfig)
+ full_state (bool): The full_state flag as specified by user
+ since_token (StreamToken): The token supplied by user, or None.
+ now_token (StreamToken): The token to sync up to.
+ joined_room_ids (list[str]): List of rooms the user is joined to
"""
self.sync_config = sync_config
self.full_state = full_state
@@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object):
Args:
room_id(str)
rtype(str): One of `"joined"` or `"archived"`
- events(list): List of events to include in the room, (more events
- may be added when generating result).
+ events(list[FrozenEvent]): List of events to include in the room
+ (more events may be added when generating result).
newly_joined(bool): If the user has newly joined the room
full_state(bool): Whether the full state should be sent in result
since_token(StreamToken): Earliest point to return events from, or None
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..c21da8343a 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,7 +15,7 @@
import logging
-from six import iteritems
+from six import iteritems, iterkeys
from twisted.internet import defer
@@ -63,10 +63,6 @@ class UserDirectoryHandler(object):
# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
self.initially_handled_users = set()
- self.initially_handled_users_in_public = set()
-
- self.initially_handled_users_share = set()
- self.initially_handled_users_share_private_room = set()
# The current position in the current_state_delta stream
self.pos = None
@@ -140,7 +136,6 @@ class UserDirectoryHandler(object):
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id)
- yield self.store.remove_from_user_in_public_room(user_id)
@defer.inlineCallbacks
def _unsafe_process(self):
@@ -215,15 +210,13 @@ class UserDirectoryHandler(object):
logger.info("Processed all users")
self.initially_handled_users = None
- self.initially_handled_users_in_public = None
- self.initially_handled_users_share = None
- self.initially_handled_users_share_private_room = None
yield self.store.update_user_directory_stream_pos(new_pos)
@defer.inlineCallbacks
def _handle_initial_room(self, room_id):
- """Called when we initially fill out user_directory one room at a time
+ """
+ Called when we initially fill out user_directory one room at a time
"""
is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
if not is_in_room:
@@ -238,23 +231,15 @@ class UserDirectoryHandler(object):
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
- room_id,
{user_id: users_with_profile[user_id] for user_id in unhandled_users},
)
self.initially_handled_users |= unhandled_users
- if is_public:
- yield self.store.add_users_to_public_room(
- room_id, user_ids=user_ids - self.initially_handled_users_in_public
- )
- self.initially_handled_users_in_public |= user_ids
-
# We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources.
# We also batch up inserts/updates, but try to avoid too many at once.
to_insert = set()
- to_update = set()
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
@@ -277,21 +262,7 @@ class UserDirectoryHandler(object):
count += 1
user_set = (user_id, other_user_id)
-
- if user_set in self.initially_handled_users_share_private_room:
- continue
-
- if user_set in self.initially_handled_users_share:
- if is_public:
- continue
- to_update.add(user_set)
- else:
- to_insert.add(user_set)
-
- if is_public:
- self.initially_handled_users_share.add(user_set)
- else:
- self.initially_handled_users_share_private_room.add(user_set)
+ to_insert.add(user_set)
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room(
@@ -299,22 +270,10 @@ class UserDirectoryHandler(object):
)
to_insert.clear()
- if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
- to_update.clear()
-
if to_insert:
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
to_insert.clear()
- if to_update:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
- to_update.clear()
-
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
@@ -356,6 +315,7 @@ class UserDirectoryHandler(object):
user_ids = yield self.store.get_users_in_dir_due_to_room(
room_id
)
+
for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id)
return
@@ -436,14 +396,20 @@ class UserDirectoryHandler(object):
# ignore the change
return
- if change:
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
- for user_id, profile in iteritems(users_with_profile):
- yield self._handle_new_user(room_id, user_id, profile)
- else:
- users = yield self.store.get_users_in_public_due_to_room(room_id)
- for user_id in users:
- yield self._handle_remove_user(room_id, user_id)
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+ # Remove every user from the sharing tables for that room.
+ for user_id in iterkeys(users_with_profile):
+ yield self.store.remove_user_who_share_room(user_id, room_id)
+
+ # Then, re-add them to the tables.
+ # NOTE: this is not the most efficient method, as handle_new_user sets
+ # up local_user -> other_user and other_user_whos_local -> local_user,
+ # which when ran over an entire room, will result in the same values
+ # being added multiple times. The batching upserts shouldn't make this
+ # too bad, though.
+ for user_id, profile in iteritems(users_with_profile):
+ yield self._handle_new_user(room_id, user_id, profile)
@defer.inlineCallbacks
def _handle_local_user(self, user_id):
@@ -457,7 +423,7 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
- yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+ yield self.store.add_profiles_to_user_dir({user_id: profile})
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
@@ -471,55 +437,27 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
- yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+ yield self.store.add_profiles_to_user_dir({user_id: profile})
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
-
- if is_public:
- row = yield self.store.get_user_in_public_room(user_id)
- if not row:
- yield self.store.add_users_to_public_room(room_id, [user_id])
- else:
- logger.debug("Not adding new user to public dir, %r", user_id)
-
- # Now we update users who share rooms with users. We do this by getting
- # all the current users in the room and seeing which aren't already
- # marked in the database as sharing with `user_id`
-
+ # Now we update users who share rooms with users.
users_with_profile = yield self.state.get_current_user_in_room(room_id)
to_insert = set()
- to_update = set()
-
- is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
# First, if they're our user then we need to update for every user
- if self.is_mine_id(user_id) and not is_appservice:
- # Returns a map of other_user_id -> shared_private. We only need
- # to update mappings if for users that either don't share a room
- # already (aren't in the map) or, if the room is private, those that
- # only share a public room.
- user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
- user_id
- )
+ if self.is_mine_id(user_id):
- for other_user_id in users_with_profile:
- if user_id == other_user_id:
- continue
+ is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+
+ # We don't care about appservice users.
+ if not is_appservice:
+ for other_user_id in users_with_profile:
+ if user_id == other_user_id:
+ continue
- shared_is_private = user_ids_shared.get(other_user_id)
- if shared_is_private is True:
- # We've already marked in the database they share a private room
- continue
- elif shared_is_private is False:
- # They already share a public room, so only update if this is
- # a private room
- if not is_public:
- to_update.add((user_id, other_user_id))
- elif shared_is_private is None:
- # This is the first time they both share a room
to_insert.add((user_id, other_user_id))
# Next we need to update for every local user in the room
@@ -531,29 +469,11 @@ class UserDirectoryHandler(object):
other_user_id
)
if self.is_mine_id(other_user_id) and not is_appservice:
- shared_is_private = yield self.store.get_if_users_share_a_room(
- other_user_id, user_id
- )
- if shared_is_private is True:
- # We've already marked in the database they share a private room
- continue
- elif shared_is_private is False:
- # They already share a public room, so only update if this is
- # a private room
- if not is_public:
- to_update.add((other_user_id, user_id))
- elif shared_is_private is None:
- # This is the first time they both share a room
- to_insert.add((other_user_id, user_id))
+ to_insert.add((other_user_id, user_id))
if to_insert:
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
- if to_update:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, to_update
- )
-
@defer.inlineCallbacks
def _handle_remove_user(self, room_id, user_id):
"""Called when we might need to remove user to directory
@@ -562,84 +482,16 @@ class UserDirectoryHandler(object):
room_id (str): room_id that user left or stopped being public that
user_id (str)
"""
- logger.debug("Maybe removing user %r", user_id)
-
- row = yield self.store.get_user_in_directory(user_id)
- update_user_dir = row and row["room_id"] == room_id
-
- row = yield self.store.get_user_in_public_room(user_id)
- update_user_in_public = row and row["room_id"] == room_id
-
- if update_user_in_public or update_user_dir:
- # XXX: Make this faster?
- rooms = yield self.store.get_rooms_for_user(user_id)
- for j_room_id in rooms:
- if not update_user_in_public and not update_user_dir:
- break
-
- is_in_room = yield self.store.is_host_joined(
- j_room_id, self.server_name
- )
-
- if not is_in_room:
- continue
-
- if update_user_dir:
- update_user_dir = False
- yield self.store.update_user_in_user_dir(user_id, j_room_id)
+ logger.debug("Removing user %r", user_id)
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- j_room_id
- )
+ # Remove user from sharing tables
+ yield self.store.remove_user_who_share_room(user_id, room_id)
- if update_user_in_public and is_public:
- yield self.store.update_user_in_public_user_list(user_id, j_room_id)
- update_user_in_public = False
+ # Are they still in a room with members? If not, remove them entirely.
+ users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id)
- if update_user_dir:
+ if len(users_in_room_with) == 0:
yield self.store.remove_from_user_dir(user_id)
- elif update_user_in_public:
- yield self.store.remove_from_user_in_public_room(user_id)
-
- # Now handle users_who_share_rooms.
-
- # Get a list of user tuples that were in the DB due to this room and
- # users (this includes tuples where the other user matches `user_id`)
- user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
- user_id, room_id
- )
-
- for user_id, other_user_id in user_tuples:
- # For each user tuple get a list of rooms that they still share,
- # trying to find a private room, and update the entry in the DB
- rooms = yield self.store.get_rooms_in_common_for_users(
- user_id, other_user_id
- )
-
- # If they dont share a room anymore, remove the mapping
- if not rooms:
- yield self.store.remove_user_who_share_room(user_id, other_user_id)
- continue
-
- found_public_share = None
- for j_room_id in rooms:
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- j_room_id
- )
-
- if is_public:
- found_public_share = j_room_id
- else:
- found_public_share = None
- yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
- )
- break
-
- if found_public_share:
- yield self.store.update_users_who_share_room(
- room_id, not is_public, [(user_id, other_user_id)]
- )
@defer.inlineCallbacks
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
|