diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2020-02-11 16:18:29 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2020-02-11 16:18:29 +0000 |
commit | 0295abdcf70548a7ba9d685598233f34c50127b5 (patch) | |
tree | 766e1facba2e4efefe8b058fb2e71bf7f2619a81 /synapse/handlers/pagination.py | |
parent | Merge pull request #5042 from matrix-org/erikj/fix_get_missing_events_error (diff) | |
download | synapse-0295abdcf70548a7ba9d685598233f34c50127b5.tar.xz |
Dinsic Blacking with black==18.6b2
Diffstat (limited to 'synapse/handlers/pagination.py')
-rw-r--r-- | synapse/handlers/pagination.py | 81 |
1 files changed, 35 insertions, 46 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 3cf783e3bd..be54084088 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -58,9 +58,7 @@ class PurgeStatus(object): self.status = PurgeStatus.STATUS_ACTIVE def asdict(self): - return { - "status": PurgeStatus.STATUS_TEXT[self.status] - } + return {"status": PurgeStatus.STATUS_TEXT[self.status]} class PaginationHandler(object): @@ -153,20 +151,19 @@ class PaginationHandler(object): # Figure out what token we should start purging at. ts = self.clock.time_msec() - max_lifetime - stream_ordering = ( - yield self.store.find_first_stream_ordering_after_ts(ts) - ) + stream_ordering = (yield self.store.find_first_stream_ordering_after_ts(ts)) r = ( yield self.store.get_room_event_after_stream_ordering( - room_id, stream_ordering, + room_id, stream_ordering ) ) if not r: logger.warning( "[purge] purging events not possible: No event found " "(ts %i => stream_ordering %i)", - ts, stream_ordering, + ts, + stream_ordering, ) continue @@ -185,13 +182,10 @@ class PaginationHandler(object): # the background so that it's not blocking any other operation apart from # other purges in the same room. run_as_background_process( - "_purge_history", - self._purge_history, - purge_id, room_id, token, True, + "_purge_history", self._purge_history, purge_id, room_id, token, True ) - def start_purge_history(self, room_id, token, - delete_local_events=False): + def start_purge_history(self, room_id, token, delete_local_events=False): """Start off a history purge on a room. Args: @@ -206,8 +200,7 @@ class PaginationHandler(object): """ if room_id in self._purges_in_progress_by_room: raise SynapseError( - 400, - "History purge already in progress for %s" % (room_id, ), + 400, "History purge already in progress for %s" % (room_id,) ) purge_id = random_string(16) @@ -218,14 +211,12 @@ class PaginationHandler(object): self._purges_by_id[purge_id] = PurgeStatus() run_in_background( - self._purge_history, - purge_id, room_id, token, delete_local_events, + self._purge_history, purge_id, room_id, token, delete_local_events ) return purge_id @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, - delete_local_events): + def _purge_history(self, purge_id, room_id, token, delete_local_events): """Carry out a history purge on a room. Args: @@ -241,16 +232,13 @@ class PaginationHandler(object): self._purges_in_progress_by_room.add(room_id) try: with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history( - room_id, token, delete_local_events, - ) + yield self.store.purge_history(room_id, token, delete_local_events) logger.info("[purge] complete") self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE except Exception: f = Failure() logger.error( - "[purge] failed", - exc_info=(f.type, f.value, f.getTracebackObject()), + "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) ) self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED finally: @@ -259,6 +247,7 @@ class PaginationHandler(object): # remove the purge from the list 24 hours after it completes def clear_purge(): del self._purges_by_id[purge_id] + self.hs.get_reactor().callLater(24 * 3600, clear_purge) def get_purge_status(self, purge_id): @@ -273,8 +262,14 @@ class PaginationHandler(object): return self._purges_by_id.get(purge_id) @defer.inlineCallbacks - def get_messages(self, requester, room_id=None, pagin_config=None, - as_client_event=True, event_filter=None): + def get_messages( + self, + requester, + room_id=None, + pagin_config=None, + as_client_event=True, + event_filter=None, + ): """Get messages in a room. Args: @@ -312,7 +307,7 @@ class PaginationHandler(object): room_id, user_id ) - if source_config.direction == 'b': + if source_config.direction == "b": # if we're going backwards, we might need to backfill. This # requires that we have a topo token. if room_token.topological: @@ -346,27 +341,24 @@ class PaginationHandler(object): event_filter=event_filter, ) - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) + next_token = pagin_config.from_token.copy_and_replace("room_key", next_key) if events: if event_filter: events = event_filter.filter(events) events = yield filter_events_for_client( - self.store, - user_id, - events, - is_peeking=(member_event_id is None), + self.store, user_id, events, is_peeking=(member_event_id is None) ) if not events: - defer.returnValue({ - "chunk": [], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - }) + defer.returnValue( + { + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + ) state = None if event_filter and event_filter.lazy_load_members() and len(events) > 0: @@ -374,12 +366,11 @@ class PaginationHandler(object): # FIXME: we also care about invite targets etc. state_filter = StateFilter.from_types( - (EventTypes.Member, event.sender) - for event in events + (EventTypes.Member, event.sender) for event in events ) state_ids = yield self.store.get_state_ids_for_event( - events[0].event_id, state_filter=state_filter, + events[0].event_id, state_filter=state_filter ) if state_ids: @@ -391,8 +382,7 @@ class PaginationHandler(object): chunk = { "chunk": ( yield self._event_serializer.serialize_events( - events, time_now, - as_client_event=as_client_event, + events, time_now, as_client_event=as_client_event ) ), "start": pagin_config.from_token.to_string(), @@ -402,8 +392,7 @@ class PaginationHandler(object): if state: chunk["state"] = ( yield self._event_serializer.serialize_events( - state, time_now, - as_client_event=as_client_event, + state, time_now, as_client_event=as_client_event ) ) |