summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4719.misc1
-rw-r--r--changelog.d/4734.misc1
-rw-r--r--synapse/app/federation_reader.py6
-rw-r--r--synapse/federation/federation_server.py3
-rw-r--r--synapse/federation/transaction_queue.py55
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/room_list.py9
-rw-r--r--synapse/handlers/room_member.py44
-rw-r--r--synapse/handlers/sync.py54
-rw-r--r--synapse/handlers/user_directory.py110
-rw-r--r--synapse/push/httppusher.py4
-rw-r--r--synapse/replication/slave/storage/client_ips.py2
-rw-r--r--synapse/replication/tcp/protocol.py4
-rw-r--r--synapse/replication/tcp/streams.py6
-rw-r--r--synapse/rest/client/v1/admin.py22
-rw-r--r--synapse/storage/client_ips.py2
-rw-r--r--synapse/storage/roommember.py11
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/stream.py15
-rw-r--r--tests/rest/client/v1/test_rooms.py12
20 files changed, 238 insertions, 127 deletions
diff --git a/changelog.d/4719.misc b/changelog.d/4719.misc
new file mode 100644

index 0000000000..8bc536ab66 --- /dev/null +++ b/changelog.d/4719.misc
@@ -0,0 +1 @@ +Add more debug for membership syncing issues. diff --git a/changelog.d/4734.misc b/changelog.d/4734.misc new file mode 100644
index 0000000000..f4e3aeb44f --- /dev/null +++ b/changelog.d/4734.misc
@@ -0,0 +1 @@ +Add some debug to help with #4733. diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index b116c17669..7da79dc827 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource import synapse from synapse import events -from synapse.api.urls import FEDERATION_PREFIX +from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig @@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer): ), }) + if name in ["keys", "federation"]: + resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 569eb277a9..34e60d0a73 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -886,6 +886,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): def on_edu(self, edu_type, origin, content): """Overrides FederationHandlerRegistry """ + if edu_type == "m.presence": + return + handler = self.edu_handlers.get(edu_type) if handler: return super(ReplicationFederationHandlerRegistry, self).on_edu( diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 30941f5ad6..c3b77419a9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py
@@ -66,6 +66,9 @@ sent_edus_by_type = Counter( ["type"], ) +# number of seconds to wait to batch up outgoing EDUs +EDU_BATCH_TIME = 5.0 + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -119,6 +122,12 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} + # In order to batch outgoing EDUs, we delay sending them. This records the time + # when we should send the next batch, by destination. + self.edu_tx_time_by_dest = {} + + self.edu_tx_task_by_dest = {} + LaterGauge( "synapse_federation_transaction_queue_pending_pdus", "", @@ -380,7 +389,21 @@ class TransactionQueue(object): else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) - self._attempt_new_transaction(destination) + if destination not in self.edu_tx_time_by_dest: + txtime = self.clock.time() + EDU_BATCH_TIME * 1000 + self.edu_tx_time_by_dest[destination] = txtime + + if destination in self.edu_tx_task_by_dest: + # we already have a job queued to send EDUs to this destination + return + + def send_edus(): + del self.edu_tx_task_by_dest[destination] + self._attempt_new_transaction(destination) + + self.edu_tx_task_by_dest[destination] = self.clock.call_later( + EDU_BATCH_TIME, send_edus, + ) def send_device_messages(self, destination): if destination == self.server_name: @@ -405,6 +428,7 @@ class TransactionQueue(object): Returns: None """ + # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -458,18 +482,29 @@ class TransactionQueue(object): if leftover_pdus: self.pending_pdus_by_dest[destination] = leftover_pdus - pending_edus = self.pending_edus_by_dest.pop(destination, []) + # if we have PDUs to send, we may as well send EDUs too. Otherwise, + # we only send EDUs if their delay is up + if destination in self.edu_tx_time_by_dest and ( + pending_pdus or + self.clock.time() > self.edu_tx_time_by_dest[destination] + ): + del self.edu_tx_time_by_dest[destination] - # We can only include at most 100 EDUs per transactions - pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] - if leftover_edus: - self.pending_edus_by_dest[destination] = leftover_edus + pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.edu_tx_time_by_dest[destination] = self.clock.time() + self.pending_edus_by_dest[destination] = leftover_edus - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + else: + pending_edus = [] + + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_edus.extend(device_message_edus) if pending_presence: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..d14af54778 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -196,7 +196,7 @@ class MessageHandler(object): # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: + if False and requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: if requester.app_service.is_interested_in_user(uid): break diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index afa508d729..c5847def0f 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -44,9 +44,12 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs, "room_list") - self.remote_response_cache = ResponseCache(hs, "remote_room_list", - timeout_ms=30 * 1000) + self.response_cache = ResponseCache( + hs, "room_list", timeout_ms=10 * 60 * 1000, + ) + self.remote_response_cache = ResponseCache( + hs, "remote_room_list", timeout_ms=30 * 1000, + ) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 190ea2c7b1..e9b2e928e0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -66,6 +66,7 @@ class RoomMemberHandler(object): self.event_creation_handler = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") + self.member_limiter = Linearizer(max_count=10, name="member_as_limiter") self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() @@ -304,18 +305,37 @@ class RoomMemberHandler(object): ): key = (room_id,) - with (yield self.member_linearizer.queue(key)): - result = yield self._update_membership( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - ) + as_id = object() + if requester.app_service: + as_id = requester.app_service.id + + then = self.clock.time_msec() + + with (yield self.member_limiter.queue(as_id)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + with (yield self.member_linearizer.queue(key)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + ) defer.returnValue(result) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd97241ab4..b4c4d89945 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -39,6 +39,7 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000 # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -208,7 +209,9 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS, + ) self.state = hs.get_state_handler() self.auth = hs.get_auth() @@ -962,6 +965,15 @@ class SyncHandler(object): yield self._generate_sync_entry_for_groups(sync_result_builder) + # debug for https://github.com/matrix-org/synapse/issues/4422 + for joined_room in sync_result_builder.joined: + room_id = joined_room.room_id + if room_id in newly_joined_rooms: + logger.info( + "Sync result for newly joined room %s: %r", + room_id, joined_room, + ) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -1519,30 +1531,39 @@ class SyncHandler(object): for room_id in sync_result_builder.joined_room_ids: room_entry = room_to_events.get(room_id, None) + newly_joined = room_id in newly_joined_rooms if room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace("room_key", start_key) - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=events, - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, - since_token=None if room_id in newly_joined_rooms else since_token, + since_token=None if newly_joined else since_token, upto_token=prev_batch_token, - )) + ) else: - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=[], - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, since_token=since_token, upto_token=since_token, - )) + ) + + if newly_joined: + # debugging for https://github.com/matrix-org/synapse/issues/4422 + logger.info( + "RoomSyncResultBuilder events for newly joined room %s: %r", + room_id, entry.events, + ) + room_entries.append(entry) defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @@ -1663,6 +1684,13 @@ class SyncHandler(object): newly_joined_room=newly_joined, ) + if newly_joined: + # debug for https://github.com/matrix-org/synapse/issues/4422 + logger.info( + "Timeline events after filtering in newly-joined room %s: %r", + room_id, batch, + ) + # When we join the room (or the client requests full_state), we should # send down any existing tags. Usually the user won't have tags in a # newly joined room, unless either a) they've joined before or b) the @@ -1894,7 +1922,12 @@ def _calculate_state( class SyncResultBuilder(object): - "Used to help build up a new SyncResult for a user" + """Used to help build up a new SyncResult for a user + + Attributes: + joined (list[JoinedSyncResult]): + archived (list[ArchivedSyncResult]): + """ def __init__(self, sync_config, full_state, since_token, now_token, joined_room_ids): """ @@ -1903,6 +1936,7 @@ class SyncResultBuilder(object): full_state(bool): The full_state flag as specified by user since_token(StreamToken): The token supplied by user, or None. now_token(StreamToken): The token to sync up to. + """ self.sync_config = sync_config self.full_state = full_state @@ -1930,7 +1964,7 @@ class RoomSyncResultBuilder(object): Args: room_id(str) rtype(str): One of `"joined"` or `"archived"` - events(list): List of events to include in the room, (more events + events(list[FrozenEvent]): List of events to include in the room (more events may be added when generating result). newly_joined(bool): If the user has newly joined the room full_state(bool): Whether the full state should be sent in result diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..f5c3ba23a6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py
@@ -14,6 +14,7 @@ # limitations under the License. import logging +import synapse.metrics from six import iteritems @@ -28,7 +29,6 @@ from synapse.util.metrics import Measure logger = logging.getLogger(__name__) - class UserDirectoryHandler(object): """Handles querying of and keeping updated the user_directory. @@ -130,7 +130,7 @@ class UserDirectoryHandler(object): # Support users are for diagnostics and should not appear in the user directory. if not is_support: yield self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url, None + user_id, profile.display_name, profile.avatar_url, None, ) @defer.inlineCallbacks @@ -166,9 +166,8 @@ class UserDirectoryHandler(object): self.pos = deltas[-1]["stream_id"] # Expose current event processing position to prometheus - synapse.metrics.event_processing_positions.labels("user_dir").set( - self.pos - ) + synapse.metrics.event_processing_positions.labels( + "user_dir").set(self.pos) yield self.store.update_user_directory_stream_pos(self.pos) @@ -192,25 +191,21 @@ class UserDirectoryHandler(object): logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) logger.info("Processed all rooms.") if self.search_all_users: num_processed_users = 0 user_ids = yield self.store.get_all_local_users() - logger.info( - "Doing initial update of user directory. %d users", len(user_ids) - ) + logger.info("Doing initial update of user directory. %d users", len(user_ids)) for user_id in user_ids: # We add profiles for all users even if they don't match the # include pattern, just in case we want to change it in future - logger.info( - "Handling user %d/%d", num_processed_users + 1, len(user_ids) - ) + logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) yield self._handle_local_user(user_id) num_processed_users += 1 - yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.) logger.info("Processed all users") @@ -229,24 +224,24 @@ class UserDirectoryHandler(object): if not is_in_room: return - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id) user_ids = set(users_with_profile) unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( - room_id, - {user_id: users_with_profile[user_id] for user_id in unhandled_users}, + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } ) self.initially_handled_users |= unhandled_users if is_public: yield self.store.add_users_to_public_room( - room_id, user_ids=user_ids - self.initially_handled_users_in_public + room_id, + user_ids=user_ids - self.initially_handled_users_in_public ) self.initially_handled_users_in_public |= user_ids @@ -258,7 +253,7 @@ class UserDirectoryHandler(object): count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) if not self.is_mine_id(user_id): count += 1 @@ -273,7 +268,7 @@ class UserDirectoryHandler(object): continue if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) count += 1 user_set = (user_id, other_user_id) @@ -295,23 +290,25 @@ class UserDirectoryHandler(object): if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.add_users_who_share_room( - room_id, not is_public, to_insert + room_id, not is_public, to_insert, ) to_insert.clear() if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update + room_id, not is_public, to_update, ) to_update.clear() if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) to_insert.clear() if to_update: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update + room_id, not is_public, to_update, ) to_update.clear() @@ -332,42 +329,21 @@ class UserDirectoryHandler(object): # may have become public or not and add/remove the users in said room if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): yield self._handle_room_publicity_change( - room_id, prev_event_id, event_id, typ + room_id, prev_event_id, event_id, typ, ) elif typ == EventTypes.Member: change = yield self._get_key_change( - prev_event_id, - event_id, + prev_event_id, event_id, key_name="membership", public_value=Membership.JOIN, ) - if change is False: - # Need to check if the server left the room entirely, if so - # we might need to remove all the users in that room - is_in_room = yield self.store.is_host_joined( - room_id, self.server_name - ) - if not is_in_room: - logger.info("Server left room: %r", room_id) - # Fetch all the users that we marked as being in user - # directory due to being in the room and then check if - # need to remove those users or not - user_ids = yield self.store.get_users_in_dir_due_to_room( - room_id - ) - for user_id in user_ids: - yield self._handle_remove_user(room_id, user_id) - return - else: - logger.debug("Server is still in room: %r", room_id) - is_support = yield self.store.is_support_user(state_key) if not is_support: if change is None: # Handle any profile changes yield self._handle_profile_change( - state_key, room_id, prev_event_id, event_id + state_key, room_id, prev_event_id, event_id, ) continue @@ -399,15 +375,13 @@ class UserDirectoryHandler(object): if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( - prev_event_id, - event_id, + prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) elif typ == EventTypes.JoinRules: change = yield self._get_key_change( - prev_event_id, - event_id, + prev_event_id, event_id, key_name="join_rule", public_value=JoinRules.PUBLIC, ) @@ -532,7 +506,7 @@ class UserDirectoryHandler(object): ) if self.is_mine_id(other_user_id) and not is_appservice: shared_is_private = yield self.store.get_if_users_share_a_room( - other_user_id, user_id + other_user_id, user_id, ) if shared_is_private is True: # We've already marked in the database they share a private room @@ -547,11 +521,13 @@ class UserDirectoryHandler(object): to_insert.add((other_user_id, user_id)) if to_insert: - yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) + yield self.store.add_users_who_share_room( + room_id, not is_public, to_insert, + ) if to_update: yield self.store.update_users_who_share_room( - room_id, not is_public, to_update + room_id, not is_public, to_update, ) @defer.inlineCallbacks @@ -570,15 +546,15 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_public_room(user_id) update_user_in_public = row and row["room_id"] == room_id - if update_user_in_public or update_user_dir: + if (update_user_in_public or update_user_dir): # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: + if (not update_user_in_public and not update_user_dir): break is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name + j_room_id, self.server_name, ) if not is_in_room: @@ -606,19 +582,19 @@ class UserDirectoryHandler(object): # Get a list of user tuples that were in the DB due to this room and # users (this includes tuples where the other user matches `user_id`) user_tuples = yield self.store.get_users_in_share_dir_with_room_id( - user_id, room_id + user_id, room_id, ) for user_id, other_user_id in user_tuples: # For each user tuple get a list of rooms that they still share, # trying to find a private room, and update the entry in the DB - rooms = yield self.store.get_rooms_in_common_for_users( - user_id, other_user_id - ) + rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id) # If they dont share a room anymore, remove the mapping if not rooms: - yield self.store.remove_user_who_share_room(user_id, other_user_id) + yield self.store.remove_user_who_share_room( + user_id, other_user_id, + ) continue found_public_share = None @@ -632,13 +608,13 @@ class UserDirectoryHandler(object): else: found_public_share = None yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] + room_id, not is_public, [(user_id, other_user_id)], ) break if found_public_share: yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] + room_id, not is_public, [(user_id, other_user_id)], ) @defer.inlineCallbacks @@ -666,7 +642,7 @@ class UserDirectoryHandler(object): if prev_name != new_name or prev_avatar != new_avatar: yield self.store.update_profile_in_user_dir( - user_id, new_name, new_avatar, room_id + user_id, new_name, new_avatar, room_id, ) @defer.inlineCallbacks diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index e65f8c63d3..ffae376915 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -107,6 +107,10 @@ class HttpPusher(object): "'url' required in data for HTTP pusher" ) self.url = self.data['url'] + self.url = self.url.replace( + "https://matrix.org/_matrix/push/v1/notify", + "http://http-priv.matrix.org/_matrix/push/v1/notify", + ) self.http_client = hs.get_simple_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60641f1a49..5b8521c770 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py
@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore): if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return + self.client_ip_last_seen.prefill(key, now) + self.hs.get_tcp_replication().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 429471c345..530bd3756c 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -52,6 +52,7 @@ indicate which side is sending, these are *not* included on the wire:: import fcntl import logging import struct +import traceback from collections import defaultdict from six import iteritems, iterkeys @@ -241,6 +242,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def send_error(self, error_string, *args): """Send an error to remote and close the connection. """ + logger.error("[%s] Sending error: %s", self.id(), error_string % args) self.send_command(ErrorCommand(error_string % args)) self.close() @@ -333,6 +335,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): we or the remote has closed the connection) """ logger.info("[%s] Stop producing", self.id()) + # debug for #4733 + logger.info("Traceback: %s", "".join(traceback.format_stack())) self.on_connection_closed() def connectionLost(self, reason): diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index c1e626be3f..d49973634e 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py
@@ -32,7 +32,7 @@ from twisted.internet import defer logger = logging.getLogger(__name__) -MAX_EVENTS_BEHIND = 10000 +MAX_EVENTS_BEHIND = 500000 EventStreamRow = namedtuple("EventStreamRow", ( @@ -265,8 +265,8 @@ class PresenceStream(Stream): store = hs.get_datastore() presence_handler = hs.get_presence_handler() - self.current_token = store.get_current_presence_token - self.update_function = presence_handler.get_all_presence_updates + self.current_token = lambda: 0 + self.update_function = lambda _a, _b: [] super(PresenceStream, self).__init__(hs) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 82433a2aa9..2e303264f6 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py
@@ -466,17 +466,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - yield self.event_creation_handler.create_and_send_nonmember_event( - room_creator_requester, - { - "type": "m.room.message", - "content": {"body": message, "msgtype": "m.text"}, - "room_id": new_room_id, - "sender": new_room_user_id, - }, - ratelimit=False, - ) - requester_user_id = requester.user.to_string() logger.info("Shutting down room %r", room_id) @@ -514,6 +503,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): kicked_users.append(user_id) + yield self.event_creation_handler.create_and_send_nonmember_event( + room_creator_requester, + { + "type": "m.room.message", + "content": {"body": message, "msgtype": "m.text"}, + "room_id": new_room_id, + "sender": new_room_user_id, + }, + ratelimit=False, + ) + aliases_for_room = yield self.store.get_aliases_for_room(room_id) yield self.store.update_aliases_for_room( diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 9c21362226..1adfee8c0a 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +LAST_SEEN_GRANULARITY = 10 * 60 * 1000 class ClientIpStore(background_updates.BackgroundUpdateStore): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 592c1bcd33..2874dabbd1 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py
@@ -72,7 +72,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids) defer.returnValue(hosts) - @cached(max_entries=100000, iterable=True) + @cachedInlineCallbacks(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): def f(txn): sql = ( @@ -86,7 +86,14 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN,)) return [to_ascii(r[0]) for r in txn] - return self.runInteraction("get_users_in_room", f) + start_time = self._clock.time_msec() + result = yield self.runInteraction("get_users_in_room", f) + end_time = self._clock.time_msec() + logger.info( + "Fetched room membership for %s (%i users) in %i ms", + room_id, len(result), end_time - start_time, + ) + defer.returnValue(result) @cached(max_entries=100000) def get_room_summary(self, room_id): diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index c6420b2374..ad01071318 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py
@@ -730,7 +730,7 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join(result + ":*" for result in results) + return " & ".join(result for result in results) elif isinstance(database_engine, Sqlite3Engine): return " & ".join(result + "*" for result in results) else: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d6cfdba519..b5aa849f4c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py
@@ -191,6 +191,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, order='DESC'): + """ + + Args: + room_ids: + from_key: + to_key: + limit: + order: + + Returns: + Deferred[dict[str,tuple[list[FrozenEvent], str]]] + A map from room id to a tuple containing: + - list of recent events in the room + - stream ordering key for the start of the chunk of events returned. + """ from_id = RoomStreamToken.parse_stream_token(from_key).stream room_ids = yield self._events_stream_cache.get_entities_changed( diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index a824be9a62..5005fd08d3 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py
@@ -761,11 +761,13 @@ class RoomInitialSyncTestCase(RoomBase): self.assertTrue("presence" in channel.json_body) - presence_by_user = { - e["content"]["user_id"]: e for e in channel.json_body["presence"] - } - self.assertTrue(self.user_id in presence_by_user) - self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) + # presence is turned off on hotfixes + + # presence_by_user = { + # e["content"]["user_id"]: e for e in channel.json_body["presence"] + # } + # self.assertTrue(self.user_id in presence_by_user) + # self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) class RoomMessageListTestCase(RoomBase):