From 0cbda53819dd66df05c872ea021767336b457769 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 27 Jan 2018 08:48:41 +0000 Subject: Rename resolve_state_groups -> resolve_state_groups_for_events (to make way for a method that actually just does the state group resolution) --- synapse/handlers/federation.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 677532c87b..8ee9434c9b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -808,13 +808,12 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") + resolve = logcontext.preserve_fn( + self.state_handler.resolve_state_groups_for_events + ) states = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.preserve_fn(self.state_handler.resolve_state_groups)( - room_id, [e] - ) - for e in event_ids - ], consumeErrors=True, + [resolve(room_id, [e]) for e in event_ids], + consumeErrors=True, )) states = dict(zip(event_ids, [s.state for s in states])) -- cgit 1.5.1 From 5ff3d23564f41e3ae82398a2f0726d8914d060a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jan 2018 16:51:53 +0000 Subject: Split event creation into a separate handler --- synapse/handlers/message.py | 306 +++++++++++++++++++++++--------------------- 1 file changed, 160 insertions(+), 146 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21f1717dd2..afa19bf653 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -47,21 +47,9 @@ class MessageHandler(BaseHandler): self.hs = hs self.state = hs.get_state_handler() self.clock = hs.get_clock() - self.validator = EventValidator() - self.profile_handler = hs.get_profile_handler() self.pagination_lock = ReadWriteLock() - self.pusher_pool = hs.get_pusherpool() - - # We arbitrarily limit concurrent event creation for a room to 5. - # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) - - self.action_generator = hs.get_action_generator() - - self.spam_checker = hs.get_spam_checker() - @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -182,6 +170,162 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) + @defer.inlineCallbacks + def get_room_data(self, user_id=None, room_id=None, + event_type=None, state_key="", is_guest=False): + """ Get data from a room. + + Args: + event : The room path event + Returns: + The path data content. + Raises: + SynapseError if something went wrong. + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + data = yield self.state_handler.get_current_state( + room_id, event_type, state_key + ) + elif membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + [membership_event_id], [key] + ) + data = room_state[membership_event_id].get(key) + + defer.returnValue(data) + + @defer.inlineCallbacks + def _check_in_room_or_world_readable(self, room_id, user_id): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) + + @defer.inlineCallbacks + def get_state_events(self, user_id, room_id, is_guest=False): + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. + + Args: + user_id(str): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A list of dicts representing state events. [{}, {}, {}] + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + room_state = yield self.state_handler.get_current_state(room_id) + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], None + ) + room_state = room_state[membership_event_id] + + now = self.clock.time_msec() + defer.returnValue( + [serialize_event(c, now) for c in room_state.values()] + ) + + @defer.inlineCallbacks + def get_joined_members(self, requester, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + requester(Requester): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or becuase there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in users_with_profile.iteritems() + }) + + +class EventCreationHandler(object): + def __init__(self, hs): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.clock = hs.get_clock() + self.validator = EventValidator() + self.profile_handler = hs.get_profile_handler() + self.event_builder_factory = hs.get_event_builder_factory() + self.server_name = hs.hostname + self.ratelimiter = hs.get_ratelimiter() + self.notifier = hs.get_notifier() + + # This is only used to get at ratelimit function, and maybe_kick_guest_users + self.base_handler = BaseHandler(hs) + + self.pusher_pool = hs.get_pusherpool() + + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. + self.limiter = Limiter(max_count=5) + + self.action_generator = hs.get_action_generator() + + self.spam_checker = hs.get_spam_checker() + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, prev_event_ids=None): @@ -262,7 +406,7 @@ class MessageHandler(BaseHandler): # We check here if we are currently being rate limited, so that we # don't do unnecessary work. We check again just before we actually # send the event. - yield self.ratelimit(requester, update=False) + yield self.base_handler.ratelimit(requester, update=False) user = UserID.from_string(event.sender) @@ -342,134 +486,6 @@ class MessageHandler(BaseHandler): ) defer.returnValue(event) - @defer.inlineCallbacks - def get_room_data(self, user_id=None, room_id=None, - event_type=None, state_key="", is_guest=False): - """ Get data from a room. - - Args: - event : The room path event - Returns: - The path data content. - Raises: - SynapseError if something went wrong. - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key - ) - elif membership == Membership.LEAVE: - key = (event_type, state_key) - room_state = yield self.store.get_state_for_events( - [membership_event_id], [key] - ) - data = room_state[membership_event_id].get(key) - - defer.returnValue(data) - - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - - @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): - """Retrieve all state events for a given room. If the user is - joined to the room then return the current state. If the user has - left the room return the state events from when they left. - - Args: - user_id(str): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A list of dicts representing state events. [{}, {}, {}] - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], None - ) - room_state = room_state[membership_event_id] - - now = self.clock.time_msec() - defer.returnValue( - [serialize_event(c, now) for c in room_state.values()] - ) - - @defer.inlineCallbacks - def get_joined_members(self, requester, room_id): - """Get all the joined members in the room and their profile information. - - If the user has left the room return the state events from when they left. - - Args: - requester(Requester): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A dict of user_id to profile info - """ - user_id = requester.user.to_string() - if not requester.app_service: - # We check AS auth after fetching the room membership, as it - # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - if membership != Membership.JOIN: - raise NotImplementedError( - "Getting joined members after leaving is not implemented" - ) - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - - # If this is an AS, double check that they are allowed to see the members. - # This can either be because the AS user is in the room or becuase there - # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: - for uid in users_with_profile: - if requester.app_service.is_interested_in_user(uid): - break - else: - # Loop fell through, AS has no interested users in room - raise AuthError(403, "Appservice not in room") - - defer.returnValue({ - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in users_with_profile.iteritems() - }) - @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): @@ -509,9 +525,7 @@ class MessageHandler(BaseHandler): builder.prev_events = prev_events builder.depth = depth - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) + context = yield self.state.compute_event_context(builder) if requester: context.app_service = requester.app_service @@ -551,7 +565,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - yield self.ratelimit(requester) + yield self.base_handler.ratelimit(requester) try: yield self.auth.check_from_context(event, context) @@ -567,7 +581,7 @@ class MessageHandler(BaseHandler): logger.exception("Failed to encode content: %r", event.content) raise - yield self.maybe_kick_guest_users(event, context) + yield self.base_handler.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: # Check the alias is acually valid (at this time at least) -- cgit 1.5.1 From 3fa362502cc6c509bac65753954c313d307035e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jan 2018 16:52:07 +0000 Subject: Update places where we create events --- synapse/handlers/directory.py | 7 +++---- synapse/handlers/federation.py | 18 ++++++++---------- synapse/handlers/room.py | 10 ++++------ synapse/handlers/room_member.py | 20 +++++++++++--------- synapse/rest/client/v1/admin.py | 4 ++-- synapse/rest/client/v1/room.py | 16 +++++++++------- synapse/server.py | 5 +++++ 7 files changed, 42 insertions(+), 38 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index a0464ae5c0..8580ada60a 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -34,6 +34,7 @@ class DirectoryHandler(BaseHandler): self.state = hs.get_state_handler() self.appservice_handler = hs.get_application_service_handler() + self.event_creation_handler = hs.get_event_creation_handler() self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -249,8 +250,7 @@ class DirectoryHandler(BaseHandler): def send_room_alias_update_event(self, requester, user_id, room_id): aliases = yield self.store.get_aliases_for_room(room_id) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Aliases, @@ -272,8 +272,7 @@ class DirectoryHandler(BaseHandler): if not alias_event or alias_event.content.get("alias", "") != alias_str: return - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.CanonicalAlias, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8ee9434c9b..e6b9f5cf53 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -75,6 +75,7 @@ class FederationHandler(BaseHandler): self.is_mine_id = hs.is_mine_id self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() self.replication_layer.set_handler(self) @@ -1007,8 +1008,7 @@ class FederationHandler(BaseHandler): }) try: - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder, ) except AuthError as e: @@ -1248,8 +1248,7 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder, ) @@ -2120,8 +2119,7 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder ) @@ -2159,8 +2157,7 @@ class FederationHandler(BaseHandler): """ builder = self.event_builder_factory.new(event_dict) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler._create_new_client_event( builder=builder, ) @@ -2210,8 +2207,9 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event(builder=builder) + event, context = yield self.event_creation_handler._create_new_client_event( + builder=builder, + ) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d1cc87a016..4ea5bf1bcf 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -64,6 +64,7 @@ class RoomCreationHandler(BaseHandler): super(RoomCreationHandler, self).__init__(hs) self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() @defer.inlineCallbacks def create_room(self, requester, config, ratelimit=True): @@ -163,13 +164,11 @@ class RoomCreationHandler(BaseHandler): creation_content = config.get("creation_content", {}) - msg_handler = self.hs.get_handlers().message_handler room_member_handler = self.hs.get_handlers().room_member_handler yield self._send_events_for_new_room( requester, room_id, - msg_handler, room_member_handler, preset_config=preset_config, invite_list=invite_list, @@ -181,7 +180,7 @@ class RoomCreationHandler(BaseHandler): if "name" in config: name = config["name"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -194,7 +193,7 @@ class RoomCreationHandler(BaseHandler): if "topic" in config: topic = config["topic"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -249,7 +248,6 @@ class RoomCreationHandler(BaseHandler): self, creator, # A Requester object. room_id, - msg_handler, room_member_handler, preset_config, invite_list, @@ -272,7 +270,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def send(etype, content, **kwargs): event = create(etype, content, **kwargs) - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e6467cd1d..ab58beb0f5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -46,6 +46,7 @@ class RoomMemberHandler(BaseHandler): super(RoomMemberHandler, self).__init__(hs) self.profile_handler = hs.get_profile_handler() + self.event_creation_hander = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") @@ -66,13 +67,12 @@ class RoomMemberHandler(BaseHandler): ): if content is None: content = {} - msg_handler = self.hs.get_handlers().message_handler content["membership"] = membership if requester.is_guest: content["kind"] = "guest" - event, context = yield msg_handler.create_event( + event, context = yield self.event_creation_hander.create_event( requester, { "type": EventTypes.Member, @@ -90,12 +90,14 @@ class RoomMemberHandler(BaseHandler): ) # Check if this event matches the previous membership event for the user. - duplicate = yield msg_handler.deduplicate_state_event(event, context) + duplicate = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if duplicate is not None: # Discard the new event since this membership change is a no-op. defer.returnValue(duplicate) - yield msg_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -394,8 +396,9 @@ class RoomMemberHandler(BaseHandler): else: requester = synapse.types.create_requester(target_user) - message_handler = self.hs.get_handlers().message_handler - prev_event = yield message_handler.deduplicate_state_event(event, context) + prev_event = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if prev_event is not None: return @@ -412,7 +415,7 @@ class RoomMemberHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - yield message_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -644,8 +647,7 @@ class RoomMemberHandler(BaseHandler): ) ) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_hander.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 0615e5d807..f77f646670 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -171,6 +171,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): self.store = hs.get_datastore() self.handlers = hs.get_handlers() self.state = hs.get_state_handler() + self.event_creation_handler = hs.get_event_creation_handler() @defer.inlineCallbacks def on_POST(self, request, room_id): @@ -203,8 +204,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - msg_handler = self.handlers.message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( room_creator_requester, { "type": "m.room.message", diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 867ec8602c..ad6534537a 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -82,6 +82,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomStateEventRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.event_creation_hander = hs.get_event_creation_handler() def register(self, http_server): # /room/$roomid/state/$eventtype @@ -162,15 +163,16 @@ class RoomStateEventRestServlet(ClientV1RestServlet): content=content, ) else: - msg_handler = self.handlers.message_handler - event, context = yield msg_handler.create_event( + event, context = yield self.event_creation_hander.create_event( requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id, ) - yield msg_handler.send_nonmember_event(requester, event, context) + yield self.event_creation_hander.send_nonmember_event( + requester, event, context, + ) ret = {} if event: @@ -184,6 +186,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomSendEventRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.event_creation_hander = hs.get_event_creation_handler() def register(self, http_server): # /rooms/$roomid/send/$event_type[/$txn_id] @@ -205,8 +208,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet): if 'ts' in request.args and requester.app_service: event_dict['origin_server_ts'] = parse_integer(request, "ts", 0) - msg_handler = self.handlers.message_handler - event = yield msg_handler.create_and_send_nonmember_event( + event = yield self.event_creation_hander.create_and_send_nonmember_event( requester, event_dict, txn_id=txn_id, @@ -670,6 +672,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomRedactEventRestServlet, self).__init__(hs) self.handlers = hs.get_handlers() + self.event_creation_handler = hs.get_event_creation_handler() def register(self, http_server): PATTERNS = ("/rooms/(?P[^/]*)/redact/(?P[^/]*)") @@ -680,8 +683,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) - msg_handler = self.handlers.message_handler - event = yield msg_handler.create_and_send_nonmember_event( + event = yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Redaction, diff --git a/synapse/server.py b/synapse/server.py index 3173aed1d0..fbd602d40e 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -55,6 +55,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.user_directory import UserDirectoryHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.profile import ProfileHandler +from synapse.handlers.message import EventCreationHandler from synapse.groups.groups_server import GroupsServerHandler from synapse.groups.attestations import GroupAttestionRenewer, GroupAttestationSigning from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory @@ -118,6 +119,7 @@ class HomeServer(object): 'application_service_handler', 'device_message_handler', 'profile_handler', + 'event_creation_handler', 'deactivate_account_handler', 'set_password_handler', 'notifier', @@ -276,6 +278,9 @@ class HomeServer(object): def build_profile_handler(self): return ProfileHandler(self) + def build_event_creation_handler(self): + return EventCreationHandler(self) + def build_deactivate_account_handler(self): return DeactivateAccountHandler(self) -- cgit 1.5.1 From 3d33eef6fcbba474664a9bccdcb8822c6f72ee8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Feb 2018 14:31:24 +0000 Subject: Store state groups separately from events (#2784) * Split state group persist into seperate storage func * Add per database engine code for state group id gen * Move store_state_group to StateReadStore This allows other workers to use it, and so resolve state. * Hook up store_state_group * Fix tests * Rename _store_mult_state_groups_txn * Rename StateGroupReadStore * Remove redundant _have_persisted_state_group_txn * Update comments * Comment compute_event_context * Set start val for state_group_id_seq ... otherwise we try to recreate old state groups * Update comments * Don't store state for outliers * Update comment * Update docstring as state groups are ints --- synapse/events/snapshot.py | 4 +- synapse/handlers/federation.py | 24 ++- synapse/replication/slave/storage/events.py | 4 +- synapse/state.py | 56 +++++- synapse/storage/__init__.py | 1 - synapse/storage/engines/postgres.py | 6 + synapse/storage/engines/sqlite3.py | 19 ++ synapse/storage/events.py | 10 +- synapse/storage/schema/delta/47/state_group_seq.py | 37 ++++ synapse/storage/state.py | 196 +++++++++++---------- tests/replication/slave/storage/test_events.py | 4 +- tests/test_state.py | 154 +++++++++------- 12 files changed, 326 insertions(+), 189 deletions(-) create mode 100644 synapse/storage/schema/delta/47/state_group_seq.py (limited to 'synapse/handlers') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e9a732ff03..87e3fe7b97 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -25,7 +25,9 @@ class EventContext(object): The current state map excluding the current event. (type, state_key) -> event_id - state_group (int): state group id + state_group (int|None): state group id, if the state has been stored + as a state group. This is usually only None if e.g. the event is + an outlier. rejected (bool|str): A rejection reason if the event was rejected, else False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8ee9434c9b..643e813b1f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1831,8 +1831,8 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state - self._update_context_for_auth_events( - context, auth_events, event_key, + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, ) if different_auth and not event.internal_metadata.is_outlier(): @@ -1913,8 +1913,8 @@ class FederationHandler(BaseHandler): # 4. Look at rejects and their proofs. # TODO. - self._update_context_for_auth_events( - context, auth_events, event_key, + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, ) try: @@ -1923,11 +1923,15 @@ class FederationHandler(BaseHandler): logger.warn("Failed auth resolution for %r because %s", event, e) raise e - def _update_context_for_auth_events(self, context, auth_events, + @defer.inlineCallbacks + def _update_context_for_auth_events(self, event, context, auth_events, event_key): - """Update the state_ids in an event context after auth event resolution + """Update the state_ids in an event context after auth event resolution, + storing the changes as a new state group. Args: + event (Event): The event we're handling the context for + context (synapse.events.snapshot.EventContext): event context to be updated @@ -1950,7 +1954,13 @@ class FederationHandler(BaseHandler): context.prev_state_ids.update({ k: a.event_id for k, a in auth_events.iteritems() }) - context.state_group = self.store.get_next_state_group() + context.state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=context.prev_group, + delta_ids=context.delta_ids, + current_state_ids=context.current_state_ids, + ) @defer.inlineCallbacks def construct_auth_difference(self, local_auth, remote_auth): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 29d7296b43..8acb5df0f3 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -19,7 +19,7 @@ from synapse.storage import DataStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.roommember import RoomMemberStore -from synapse.storage.state import StateGroupReadStore +from synapse.storage.state import StateGroupWorkerStore from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache from ._base import BaseSlavedStore @@ -37,7 +37,7 @@ logger = logging.getLogger(__name__) # the method descriptor on the DataStore and chuck them into our class. -class SlavedEventStore(StateGroupReadStore, BaseSlavedStore): +class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedEventStore, self).__init__(db_conn, hs) diff --git a/synapse/state.py b/synapse/state.py index 273f9911ca..cc93bbcb6b 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -183,8 +183,15 @@ class StateHandler(object): def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. + This works out what the current state should be for the event, and + generates a new state group if necessary. + Args: event (synapse.events.EventBase): + old_state (dict|None): The state at the event if it can't be + calculated from existing events. This is normally only specified + when receiving an event from federation where we don't have the + prev events for, e.g. when backfilling. Returns: synapse.events.snapshot.EventContext: """ @@ -208,15 +215,22 @@ class StateHandler(object): context.current_state_ids = {} context.prev_state_ids = {} context.prev_state_events = [] - context.state_group = self.store.get_next_state_group() + + # We don't store state for outliers, so we don't generate a state + # froup for it. + context.state_group = None + defer.returnValue(context) if old_state: + # We already have the state, so we don't need to calculate it. + # Let's just correctly fill out the context and create a + # new state group for it. + context = EventContext() context.prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } - context.state_group = self.store.get_next_state_group() if event.is_state(): key = (event.type, event.state_key) @@ -229,6 +243,14 @@ class StateHandler(object): else: context.current_state_ids = context.prev_state_ids + context.state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=None, + delta_ids=None, + current_state_ids=context.current_state_ids, + ) + context.prev_state_events = [] defer.returnValue(context) @@ -242,7 +264,8 @@ class StateHandler(object): context = EventContext() context.prev_state_ids = curr_state if event.is_state(): - context.state_group = self.store.get_next_state_group() + # If this is a state event then we need to create a new state + # group for the state after this event. key = (event.type, event.state_key) if key in context.prev_state_ids: @@ -253,23 +276,42 @@ class StateHandler(object): context.current_state_ids[key] = event.event_id if entry.state_group: + # If the state at the event has a state group assigned then + # we can use that as the prev group context.prev_group = entry.state_group context.delta_ids = { key: event.event_id } elif entry.prev_group: + # If the state at the event only has a prev group, then we can + # use that as a prev group too. context.prev_group = entry.prev_group context.delta_ids = dict(entry.delta_ids) context.delta_ids[key] = event.event_id + + context.state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=context.prev_group, + delta_ids=context.delta_ids, + current_state_ids=context.current_state_ids, + ) else: + context.current_state_ids = context.prev_state_ids + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + if entry.state_group is None: - entry.state_group = self.store.get_next_state_group() + entry.state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=entry.prev_group, + delta_ids=entry.delta_ids, + current_state_ids=context.current_state_ids, + ) entry.state_id = entry.state_group context.state_group = entry.state_group - context.current_state_ids = context.prev_state_ids - context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids context.prev_state_events = [] defer.returnValue(context) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d01d46338a..f8fbd02ceb 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -124,7 +124,6 @@ class DataStore(RoomMemberStore, RoomStore, ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") - self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a6ae79dfad..8a0386c1a4 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -62,3 +62,9 @@ class PostgresEngine(object): def lock_table(self, txn, table): txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,)) + + def get_next_state_group_id(self, txn): + """Returns an int that can be used as a new state_group ID + """ + txn.execute("SELECT nextval('state_group_id_seq')") + return txn.fetchone()[0] diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 755c9a1f07..60f0fa7fb3 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -16,6 +16,7 @@ from synapse.storage.prepare_database import prepare_database import struct +import threading class Sqlite3Engine(object): @@ -24,6 +25,11 @@ class Sqlite3Engine(object): def __init__(self, database_module, database_config): self.module = database_module + # The current max state_group, or None if we haven't looked + # in the DB yet. + self._current_state_group_id = None + self._current_state_group_id_lock = threading.Lock() + def check_database(self, txn): pass @@ -43,6 +49,19 @@ class Sqlite3Engine(object): def lock_table(self, txn, table): return + def get_next_state_group_id(self, txn): + """Returns an int that can be used as a new state_group ID + """ + # We do application locking here since if we're using sqlite then + # we are a single process synapse. + with self._current_state_group_id_lock: + if self._current_state_group_id is None: + txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups") + self._current_state_group_id = txn.fetchone()[0] + + self._current_state_group_id += 1 + return self._current_state_group_id + # Following functions taken from: https://github.com/coleifer/peewee diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2fead9eb0f..af56f1ee57 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -755,9 +755,8 @@ class EventsStore(SQLBaseStore): events_and_contexts=events_and_contexts, ) - # Insert into the state_groups, state_groups_state, and - # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, events_and_contexts) + # Insert into event_to_state_groups. + self._store_event_state_mappings_txn(txn, events_and_contexts) # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. @@ -992,10 +991,9 @@ class EventsStore(SQLBaseStore): # an outlier in the database. We now have some state at that # so we need to update the state_groups table with that state. - # insert into the state_group, state_groups_state and - # event_to_state_groups tables. + # insert into event_to_state_groups. try: - self._store_mult_state_groups_txn(txn, ((event, context),)) + self._store_event_state_mappings_txn(txn, ((event, context),)) except Exception: logger.exception("") raise diff --git a/synapse/storage/schema/delta/47/state_group_seq.py b/synapse/storage/schema/delta/47/state_group_seq.py new file mode 100644 index 0000000000..f6766501d2 --- /dev/null +++ b/synapse/storage/schema/delta/47/state_group_seq.py @@ -0,0 +1,37 @@ +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + # if we already have some state groups, we want to start making new + # ones with a higher id. + cur.execute("SELECT max(id) FROM state_groups") + row = cur.fetchone() + + if row[0] is None: + start_val = 1 + else: + start_val = row[0] + 1 + + cur.execute( + "CREATE SEQUENCE state_group_id_seq START WITH %s", + (start_val, ), + ) + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 360e3e4355..adb48df73e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -42,11 +42,8 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt return len(self.delta_ids) if self.delta_ids else 0 -class StateGroupReadStore(SQLBaseStore): - """The read-only parts of StateGroupStore - - None of these functions write to the state tables, so are suitable for - including in the SlavedStores. +class StateGroupWorkerStore(SQLBaseStore): + """The parts of StateGroupStore that can be called from workers. """ STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" @@ -54,7 +51,7 @@ class StateGroupReadStore(SQLBaseStore): CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" def __init__(self, db_conn, hs): - super(StateGroupReadStore, self).__init__(db_conn, hs) + super(StateGroupWorkerStore, self).__init__(db_conn, hs) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR @@ -549,116 +546,66 @@ class StateGroupReadStore(SQLBaseStore): defer.returnValue(results) + def store_state_group(self, event_id, room_id, prev_group, delta_ids, + current_state_ids): + """Store a new set of state, returning a newly assigned state group. -class StateStore(StateGroupReadStore, BackgroundUpdateStore): - """ Keeps track of the state at a given event. - - This is done by the concept of `state groups`. Every event is a assigned - a state group (identified by an arbitrary string), which references a - collection of state events. The current state of an event is then the - collection of state events referenced by the event's state group. - - Hence, every change in the current state causes a new state group to be - generated. However, if no change happens (e.g., if we get a message event - with only one parent it inherits the state group from its parent.) - - There are three tables: - * `state_groups`: Stores group name, first event with in the group and - room id. - * `event_to_state_groups`: Maps events to state groups. - * `state_groups_state`: Maps state group to state events. - """ - - STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" - STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" - CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" - - def __init__(self, db_conn, hs): - super(StateStore, self).__init__(db_conn, hs) - self.register_background_update_handler( - self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, - self._background_deduplicate_state, - ) - self.register_background_update_handler( - self.STATE_GROUP_INDEX_UPDATE_NAME, - self._background_index_state, - ) - self.register_background_index_update( - self.CURRENT_STATE_INDEX_UPDATE_NAME, - index_name="current_state_events_member_index", - table="current_state_events", - columns=["state_key"], - where_clause="type='m.room.member'", - ) - - def _have_persisted_state_group_txn(self, txn, state_group): - txn.execute( - "SELECT count(*) FROM state_groups WHERE id = ?", - (state_group,) - ) - row = txn.fetchone() - return row and row[0] - - def _store_mult_state_groups_txn(self, txn, events_and_contexts): - state_groups = {} - for event, context in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue + Args: + event_id (str): The event ID for which the state was calculated + room_id (str) + prev_group (int|None): A previous state group for the room, optional. + delta_ids (dict|None): The delta between state at `prev_group` and + `current_state_ids`, if `prev_group` was given. Same format as + `current_state_ids`. + current_state_ids (dict): The state to store. Map of (type, state_key) + to event_id. - if context.current_state_ids is None: + Returns: + Deferred[int]: The state group ID + """ + def _store_state_group_txn(txn): + if current_state_ids is None: # AFAIK, this can never happen - logger.error( - "Non-outlier event %s had current_state_ids==None", - event.event_id) - continue + raise Exception("current_state_ids cannot be None") - # if the event was rejected, just give it the same state as its - # predecessor. - if context.rejected: - state_groups[event.event_id] = context.prev_group - continue - - state_groups[event.event_id] = context.state_group - - if self._have_persisted_state_group_txn(txn, context.state_group): - continue + state_group = self.database_engine.get_next_state_group_id(txn) self._simple_insert_txn( txn, table="state_groups", values={ - "id": context.state_group, - "room_id": event.room_id, - "event_id": event.event_id, + "id": state_group, + "room_id": room_id, + "event_id": event_id, }, ) # We persist as a delta if we can, while also ensuring the chain # of deltas isn't tooo long, as otherwise read performance degrades. - if context.prev_group: + if prev_group: is_in_db = self._simple_select_one_onecol_txn( txn, table="state_groups", - keyvalues={"id": context.prev_group}, + keyvalues={"id": prev_group}, retcol="id", allow_none=True, ) if not is_in_db: raise Exception( "Trying to persist state with unpersisted prev_group: %r" - % (context.prev_group,) + % (prev_group,) ) potential_hops = self._count_state_group_hops_txn( - txn, context.prev_group + txn, prev_group ) - if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + if prev_group and potential_hops < MAX_STATE_DELTA_HOPS: self._simple_insert_txn( txn, table="state_group_edges", values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, + "state_group": state_group, + "prev_state_group": prev_group, }, ) @@ -667,13 +614,13 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore): table="state_groups_state", values=[ { - "state_group": context.state_group, - "room_id": event.room_id, + "state_group": state_group, + "room_id": room_id, "type": key[0], "state_key": key[1], "event_id": state_id, } - for key, state_id in context.delta_ids.iteritems() + for key, state_id in delta_ids.iteritems() ], ) else: @@ -682,13 +629,13 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore): table="state_groups_state", values=[ { - "state_group": context.state_group, - "room_id": event.room_id, + "state_group": state_group, + "room_id": room_id, "type": key[0], "state_key": key[1], "event_id": state_id, } - for key, state_id in context.current_state_ids.iteritems() + for key, state_id in current_state_ids.iteritems() ], ) @@ -699,11 +646,71 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore): txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, - key=context.state_group, - value=dict(context.current_state_ids), + key=state_group, + value=dict(current_state_ids), full=True, ) + return state_group + + return self.runInteraction("store_state_group", _store_state_group_txn) + + +class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): + """ Keeps track of the state at a given event. + + This is done by the concept of `state groups`. Every event is a assigned + a state group (identified by an arbitrary string), which references a + collection of state events. The current state of an event is then the + collection of state events referenced by the event's state group. + + Hence, every change in the current state causes a new state group to be + generated. However, if no change happens (e.g., if we get a message event + with only one parent it inherits the state group from its parent.) + + There are three tables: + * `state_groups`: Stores group name, first event with in the group and + room id. + * `event_to_state_groups`: Maps events to state groups. + * `state_groups_state`: Maps state group to state events. + """ + + STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" + CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" + + def __init__(self, db_conn, hs): + super(StateStore, self).__init__(db_conn, hs) + self.register_background_update_handler( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, + self._background_deduplicate_state, + ) + self.register_background_update_handler( + self.STATE_GROUP_INDEX_UPDATE_NAME, + self._background_index_state, + ) + self.register_background_index_update( + self.CURRENT_STATE_INDEX_UPDATE_NAME, + index_name="current_state_events_member_index", + table="current_state_events", + columns=["state_key"], + where_clause="type='m.room.member'", + ) + + def _store_event_state_mappings_txn(self, txn, events_and_contexts): + state_groups = {} + for event, context in events_and_contexts: + if event.internal_metadata.is_outlier(): + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group + continue + + state_groups[event.event_id] = context.state_group + self._simple_insert_many_txn( txn, table="event_to_state_groups", @@ -763,9 +770,6 @@ class StateStore(StateGroupReadStore, BackgroundUpdateStore): return count - def get_next_state_group(self): - return self._state_groups_id_gen.get_next() - @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): """This background update will slowly deduplicate state by reencoding diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 105e1228bb..f430cce931 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -226,11 +226,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): context = EventContext() context.current_state_ids = state_ids context.prev_state_ids = state_ids - elif not backfill: + else: state_handler = self.hs.get_state_handler() context = yield state_handler.compute_event_context(event) - else: - context = EventContext() context.push_actions = push_actions diff --git a/tests/test_state.py b/tests/test_state.py index d16e1b3b8b..a5c5e55951 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -80,14 +80,14 @@ class StateGroupStore(object): return defer.succeed(groups) - def store_state_groups(self, event, context): - if context.current_state_ids is None: - return + def store_state_group(self, event_id, room_id, prev_group, delta_ids, + current_state_ids): + state_group = self._next_group + self._next_group += 1 - state_events = dict(context.current_state_ids) + self._group_to_state[state_group] = dict(current_state_ids) - self._group_to_state[context.state_group] = state_events - self._event_to_state_group[event.event_id] = context.state_group + return state_group def get_events(self, event_ids, **kwargs): return { @@ -95,10 +95,19 @@ class StateGroupStore(object): if e_id in self._event_id_to_event } + def get_state_group_delta(self, name): + return (None, None) + def register_events(self, events): for e in events: self._event_id_to_event[e.event_id] = e + def register_event_context(self, event, context): + self._event_to_state_group[event.event_id] = context.state_group + + def register_event_id_state_group(self, event_id, state_group): + self._event_to_state_group[event_id] = state_group + class DictObj(dict): def __init__(self, **kwargs): @@ -137,15 +146,7 @@ class Graph(object): class StateTestCase(unittest.TestCase): def setUp(self): - self.store = Mock( - spec_set=[ - "get_state_groups_ids", - "add_event_hashes", - "get_events", - "get_next_state_group", - "get_state_group_delta", - ] - ) + self.store = StateGroupStore() hs = Mock(spec_set=[ "get_datastore", "get_auth", "get_state_handler", "get_clock", "get_state_resolution_handler", @@ -156,9 +157,6 @@ class StateTestCase(unittest.TestCase): hs.get_auth.return_value = Auth(hs) hs.get_state_resolution_handler = lambda: StateResolutionHandler(hs) - self.store.get_next_state_group.side_effect = Mock - self.store.get_state_group_delta.return_value = (None, None) - self.state = StateHandler(hs) self.event_id = 0 @@ -197,14 +195,13 @@ class StateTestCase(unittest.TestCase): } ) - store = StateGroupStore() - self.store.get_state_groups_ids.side_effect = store.get_state_groups_ids + self.store.register_events(graph.walk()) context_store = {} for event in graph.walk(): context = yield self.state.compute_event_context(event) - store.store_state_groups(event, context) + self.store.register_event_context(event, context) context_store[event.event_id] = context self.assertEqual(2, len(context_store["D"].prev_state_ids)) @@ -249,16 +246,13 @@ class StateTestCase(unittest.TestCase): } ) - store = StateGroupStore() - self.store.get_state_groups_ids.side_effect = store.get_state_groups_ids - self.store.get_events = store.get_events - store.register_events(graph.walk()) + self.store.register_events(graph.walk()) context_store = {} for event in graph.walk(): context = yield self.state.compute_event_context(event) - store.store_state_groups(event, context) + self.store.register_event_context(event, context) context_store[event.event_id] = context self.assertSetEqual( @@ -315,16 +309,13 @@ class StateTestCase(unittest.TestCase): } ) - store = StateGroupStore() - self.store.get_state_groups_ids.side_effect = store.get_state_groups_ids - self.store.get_events = store.get_events - store.register_events(graph.walk()) + self.store.register_events(graph.walk()) context_store = {} for event in graph.walk(): context = yield self.state.compute_event_context(event) - store.store_state_groups(event, context) + self.store.register_event_context(event, context) context_store[event.event_id] = context self.assertSetEqual( @@ -398,16 +389,13 @@ class StateTestCase(unittest.TestCase): self._add_depths(nodes, edges) graph = Graph(nodes, edges) - store = StateGroupStore() - self.store.get_state_groups_ids.side_effect = store.get_state_groups_ids - self.store.get_events = store.get_events - store.register_events(graph.walk()) + self.store.register_events(graph.walk()) context_store = {} for event in graph.walk(): context = yield self.state.compute_event_context(event) - store.store_state_groups(event, context) + self.store.register_event_context(event, context) context_store[event.event_id] = context self.assertSetEqual( @@ -467,7 +455,11 @@ class StateTestCase(unittest.TestCase): @defer.inlineCallbacks def test_trivial_annotate_message(self): - event = create_event(type="test_message", name="event") + prev_event_id = "prev_event_id" + event = create_event( + type="test_message", name="event2", + prev_events=[(prev_event_id, {})], + ) old_state = [ create_event(type="test1", state_key="1"), @@ -475,11 +467,11 @@ class StateTestCase(unittest.TestCase): create_event(type="test2", state_key=""), ] - group_name = "group_name_1" - - self.store.get_state_groups_ids.return_value = { - group_name: {(e.type, e.state_key): e.event_id for e in old_state}, - } + group_name = self.store.store_state_group( + prev_event_id, event.room_id, None, None, + {(e.type, e.state_key): e.event_id for e in old_state}, + ) + self.store.register_event_id_state_group(prev_event_id, group_name) context = yield self.state.compute_event_context(event) @@ -492,7 +484,11 @@ class StateTestCase(unittest.TestCase): @defer.inlineCallbacks def test_trivial_annotate_state(self): - event = create_event(type="state", state_key="", name="event") + prev_event_id = "prev_event_id" + event = create_event( + type="state", state_key="", name="event2", + prev_events=[(prev_event_id, {})], + ) old_state = [ create_event(type="test1", state_key="1"), @@ -500,11 +496,11 @@ class StateTestCase(unittest.TestCase): create_event(type="test2", state_key=""), ] - group_name = "group_name_1" - - self.store.get_state_groups_ids.return_value = { - group_name: {(e.type, e.state_key): e.event_id for e in old_state}, - } + group_name = self.store.store_state_group( + prev_event_id, event.room_id, None, None, + {(e.type, e.state_key): e.event_id for e in old_state}, + ) + self.store.register_event_id_state_group(prev_event_id, group_name) context = yield self.state.compute_event_context(event) @@ -517,7 +513,12 @@ class StateTestCase(unittest.TestCase): @defer.inlineCallbacks def test_resolve_message_conflict(self): - event = create_event(type="test_message", name="event") + prev_event_id1 = "event_id1" + prev_event_id2 = "event_id2" + event = create_event( + type="test_message", name="event3", + prev_events=[(prev_event_id1, {}), (prev_event_id2, {})], + ) creation = create_event( type=EventTypes.Create, state_key="" @@ -537,12 +538,12 @@ class StateTestCase(unittest.TestCase): create_event(type="test4", state_key=""), ] - store = StateGroupStore() - store.register_events(old_state_1) - store.register_events(old_state_2) - self.store.get_events = store.get_events + self.store.register_events(old_state_1) + self.store.register_events(old_state_2) - context = yield self._get_context(event, old_state_1, old_state_2) + context = yield self._get_context( + event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, + ) self.assertEqual(len(context.current_state_ids), 6) @@ -550,7 +551,12 @@ class StateTestCase(unittest.TestCase): @defer.inlineCallbacks def test_resolve_state_conflict(self): - event = create_event(type="test4", state_key="", name="event") + prev_event_id1 = "event_id1" + prev_event_id2 = "event_id2" + event = create_event( + type="test4", state_key="", name="event", + prev_events=[(prev_event_id1, {}), (prev_event_id2, {})], + ) creation = create_event( type=EventTypes.Create, state_key="" @@ -575,7 +581,9 @@ class StateTestCase(unittest.TestCase): store.register_events(old_state_2) self.store.get_events = store.get_events - context = yield self._get_context(event, old_state_1, old_state_2) + context = yield self._get_context( + event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, + ) self.assertEqual(len(context.current_state_ids), 6) @@ -583,7 +591,12 @@ class StateTestCase(unittest.TestCase): @defer.inlineCallbacks def test_standard_depth_conflict(self): - event = create_event(type="test4", name="event") + prev_event_id1 = "event_id1" + prev_event_id2 = "event_id2" + event = create_event( + type="test4", name="event", + prev_events=[(prev_event_id1, {}), (prev_event_id2, {})], + ) member_event = create_event( type=EventTypes.Member, @@ -615,7 +628,9 @@ class StateTestCase(unittest.TestCase): store.register_events(old_state_2) self.store.get_events = store.get_events - context = yield self._get_context(event, old_state_1, old_state_2) + context = yield self._get_context( + event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, + ) self.assertEqual( old_state_2[2].event_id, context.current_state_ids[("test1", "1")] @@ -639,19 +654,26 @@ class StateTestCase(unittest.TestCase): store.register_events(old_state_1) store.register_events(old_state_2) - context = yield self._get_context(event, old_state_1, old_state_2) + context = yield self._get_context( + event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, + ) self.assertEqual( old_state_1[2].event_id, context.current_state_ids[("test1", "1")] ) - def _get_context(self, event, old_state_1, old_state_2): - group_name_1 = "group_name_1" - group_name_2 = "group_name_2" + def _get_context(self, event, prev_event_id_1, old_state_1, prev_event_id_2, + old_state_2): + sg1 = self.store.store_state_group( + prev_event_id_1, event.room_id, None, None, + {(e.type, e.state_key): e.event_id for e in old_state_1}, + ) + self.store.register_event_id_state_group(prev_event_id_1, sg1) - self.store.get_state_groups_ids.return_value = { - group_name_1: {(e.type, e.state_key): e.event_id for e in old_state_1}, - group_name_2: {(e.type, e.state_key): e.event_id for e in old_state_2}, - } + sg2 = self.store.store_state_group( + prev_event_id_2, event.room_id, None, None, + {(e.type, e.state_key): e.event_id for e in old_state_2}, + ) + self.store.register_event_id_state_group(prev_event_id_2, sg2) return self.state.compute_event_context(event) -- cgit 1.5.1 From 770b2252ca9b5a74c3e767bfe0c5e7bb2e84ebd3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Feb 2018 16:31:50 +0000 Subject: s/_create_new_client_event/create_new_client_event/ --- synapse/handlers/federation.py | 10 +++++----- synapse/handlers/message.py | 6 +++--- tests/storage/test_redaction.py | 6 +++--- tests/storage/test_roommember.py | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e6b9f5cf53..06d6c8425b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1008,7 +1008,7 @@ class FederationHandler(BaseHandler): }) try: - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) except AuthError as e: @@ -1248,7 +1248,7 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -2119,7 +2119,7 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) @@ -2157,7 +2157,7 @@ class FederationHandler(BaseHandler): """ builder = self.event_builder_factory.new(event_dict) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -2207,7 +2207,7 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) defer.returnValue((event, context)) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index afa19bf653..e8e6a89a3c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -378,7 +378,7 @@ class EventCreationHandler(object): if txn_id is not None: builder.internal_metadata.txn_id = txn_id - event, context = yield self._create_new_client_event( + event, context = yield self.create_new_client_event( builder=builder, requester=requester, prev_event_ids=prev_event_ids, @@ -486,9 +486,9 @@ class EventCreationHandler(object): ) defer.returnValue(event) - @measure_func("_create_new_client_event") + @measure_func("create_new_client_event") @defer.inlineCallbacks - def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): + def create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index de6d7904e3..888ddfaddd 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -58,7 +58,7 @@ class RedactionTestCase(unittest.TestCase): "content": content, }) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder ) @@ -78,7 +78,7 @@ class RedactionTestCase(unittest.TestCase): "content": {"body": body, "msgtype": u"message"}, }) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder ) @@ -97,7 +97,7 @@ class RedactionTestCase(unittest.TestCase): "redacts": event_id, }) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder ) diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index 4aff38bd58..657b279e5d 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -57,7 +57,7 @@ class RoomMemberStoreTestCase(unittest.TestCase): "content": {"membership": membership}, }) - event, context = yield self.event_creation_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder ) -- cgit 1.5.1 From 3e1e69ccafbfdf8aa7c0cd06bc4eaf948a6bafdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Feb 2018 16:40:38 +0000 Subject: Update copyright --- synapse/handlers/federation.py | 1 + synapse/handlers/message.py | 2 +- synapse/handlers/room.py | 1 + synapse/handlers/room_member.py | 1 + synapse/rest/client/v1/admin.py | 1 + synapse/rest/client/v1/room.py | 1 + 6 files changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 06d6c8425b..cba96111d1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e8e6a89a3c..1540721815 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd -# Copyright 2017 New Vector Ltd +# Copyright 2017 - 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4ea5bf1bcf..6ab020bf41 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ab58beb0f5..37dc5e99ab 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index f77f646670..20c5c66632 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index ad6534537a..fbb2fc36e4 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.5.1 From e3624fad5f0dfd3cffcbb7c996a8d29bb2c79dbd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Feb 2018 10:30:25 +0000 Subject: Remove pointless ratelimit check The intention was for the check to be called as early as possible in the request, but actually was called just before the main ratelimit check, so was fairly pointless. --- synapse/handlers/message.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1540721815..a58fc37fff 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -403,11 +403,6 @@ class EventCreationHandler(object): "Tried to send member event through non-member codepath" ) - # We check here if we are currently being rate limited, so that we - # don't do unnecessary work. We check again just before we actually - # send the event. - yield self.base_handler.ratelimit(requester, update=False) - user = UserID.from_string(event.sender) assert self.hs.is_mine(user), "User must be our own: %s" % (user,) -- cgit 1.5.1 From 671540dccf3996620ffe65705904fb911e21fb68 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 7 Feb 2018 17:27:08 +0000 Subject: rename delete_old_state -> purge_history (beacause it deletes more than state) --- synapse/handlers/message.py | 2 +- synapse/storage/events.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21f1717dd2..1c7860bb05 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -72,7 +72,7 @@ class MessageHandler(BaseHandler): depth = event.depth with (yield self.pagination_lock.write(room_id)): - yield self.store.delete_old_state(room_id, depth) + yield self.store.purge_history(room_id, depth) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7a9cd3ec90..21533970d1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2031,16 +2031,16 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def delete_old_state(self, room_id, topological_ordering): + def purge_history(self, room_id, topological_ordering): + """Deletes room history before a certain point + """ + return self.runInteraction( - "delete_old_state", - self._delete_old_state_txn, room_id, topological_ordering + "purge_history", + self._purge_history_txn, room_id, topological_ordering ) - def _delete_old_state_txn(self, txn, room_id, topological_ordering): - """Deletes old room state - """ - + def _purge_history_txn(self, txn, room_id, topological_ordering): # Tables that should be pruned: # event_auth # event_backward_extremities -- cgit 1.5.1 From 74fcbf741b3a7b95b5cc44478050e8a40fb7dc46 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Feb 2018 18:44:52 +0000 Subject: delete_local_events for purge_history Add a flag which makes the purger delete local events --- docs/admin_api/purge_history_api.rst | 14 ++++++++++++-- synapse/handlers/message.py | 4 ++-- synapse/http/servlet.py | 18 +++++++++++++++--- synapse/rest/client/v1/admin.py | 11 ++++++++++- synapse/storage/events.py | 35 ++++++++++++++++++++++++++++------- 5 files changed, 67 insertions(+), 15 deletions(-) (limited to 'synapse/handlers') diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index 08b3306366..b4e5bd9d75 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -4,8 +4,6 @@ Purge History API The purge history API allows server admins to purge historic events from their database, reclaiming disk space. -**NB!** This will not delete local events (locally sent messages content etc) from the database, but will remove lots of the metadata about them and does dramatically reduce the on disk space usage - Depending on the amount of history being purged a call to the API may take several minutes or longer. During this period users will not be able to paginate further back in the room from the point being purged from. @@ -15,3 +13,15 @@ The API is simply: ``POST /_matrix/client/r0/admin/purge_history//`` including an ``access_token`` of a server admin. + +By default, events sent by local users are not deleted, as they may represent +the only copies of this content in existence. (Events sent by remote users are +deleted, and room state data before the cutoff is always removed). + +To delete local events as well, set ``delete_local_events`` in the body: + +.. code:: json + + { + "delete_local_events": True, + } diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1c7860bb05..276d1a7722 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -63,7 +63,7 @@ class MessageHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() @defer.inlineCallbacks - def purge_history(self, room_id, event_id): + def purge_history(self, room_id, event_id, delete_local_events=False): event = yield self.store.get_event(event_id) if event.room_id != room_id: @@ -72,7 +72,7 @@ class MessageHandler(BaseHandler): depth = event.depth with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history(room_id, depth) + yield self.store.purge_history(room_id, depth, delete_local_events) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 71420e54db..ef8e62901b 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -148,11 +148,13 @@ def parse_string_from_args(args, name, default=None, required=False, return default -def parse_json_value_from_request(request): +def parse_json_value_from_request(request, allow_empty_body=False): """Parse a JSON value from the body of a twisted HTTP request. Args: request: the twisted HTTP request. + allow_empty_body (bool): if True, an empty body will be accepted and + turned into None Returns: The JSON value. @@ -165,6 +167,9 @@ def parse_json_value_from_request(request): except Exception: raise SynapseError(400, "Error reading JSON content.") + if not content_bytes and allow_empty_body: + return None + try: content = simplejson.loads(content_bytes) except Exception as e: @@ -174,17 +179,24 @@ def parse_json_value_from_request(request): return content -def parse_json_object_from_request(request): +def parse_json_object_from_request(request, allow_empty_body=False): """Parse a JSON object from the body of a twisted HTTP request. Args: request: the twisted HTTP request. + allow_empty_body (bool): if True, an empty body will be accepted and + turned into an empty dict. Raises: SynapseError if the request body couldn't be decoded as JSON or if it wasn't a JSON object. """ - content = parse_json_value_from_request(request) + content = parse_json_value_from_request( + request, allow_empty_body=allow_empty_body, + ) + + if allow_empty_body and content is None: + return {} if type(content) != dict: message = "Content must be a JSON object." diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 5022808ea9..f954d2ea65 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -128,7 +128,16 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self.handlers.message_handler.purge_history(room_id, event_id) + body = parse_json_object_from_request(request, allow_empty_body=True) + + delete_local_events = bool( + body.get("delete_local_history", False) + ) + + yield self.handlers.message_handler.purge_history( + room_id, event_id, + delete_local_events=delete_local_events, + ) defer.returnValue((200, {})) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 24d9978304..11a2ff2d8a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2031,16 +2031,32 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def purge_history(self, room_id, topological_ordering): + def purge_history( + self, room_id, topological_ordering, delete_local_events, + ): """Deletes room history before a certain point + + Args: + room_id (str): + + topological_ordering (int): + minimum topo ordering to preserve + + delete_local_events (bool): + if True, we will delete local events as well as remote ones + (instead of just marking them as outliers and deleting their + state groups). """ return self.runInteraction( "purge_history", - self._purge_history_txn, room_id, topological_ordering + self._purge_history_txn, room_id, topological_ordering, + delete_local_events, ) - def _purge_history_txn(self, txn, room_id, topological_ordering): + def _purge_history_txn( + self, txn, room_id, topological_ordering, delete_local_events, + ): # Tables that should be pruned: # event_auth # event_backward_extremities @@ -2093,11 +2109,14 @@ class EventsStore(SQLBaseStore): to_delete = [ (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) + if state_key is None and ( + delete_local_events or not self.hs.is_mine_id(event_id) + ) ] logger.info( - "[purge] found %i events before cutoff, of which %i are remote" - " non-state events to delete", len(event_rows), len(to_delete)) + "[purge] found %i events before cutoff, of which %i can be deleted", + len(event_rows), len(to_delete), + ) logger.info("[purge] Finding new backward extremities") @@ -2273,7 +2292,9 @@ class EventsStore(SQLBaseStore): " WHERE event_id = ?", [ (True, event_id,) for event_id, state_key in event_rows - if state_key is not None or self.hs.is_mine_id(event_id) + if state_key is not None or ( + not delete_local_events and self.hs.is_mine_id(event_id) + ) ] ) -- cgit 1.5.1