diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 3cf783e3bd..be54084088 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -58,9 +58,7 @@ class PurgeStatus(object):
self.status = PurgeStatus.STATUS_ACTIVE
def asdict(self):
- return {
- "status": PurgeStatus.STATUS_TEXT[self.status]
- }
+ return {"status": PurgeStatus.STATUS_TEXT[self.status]}
class PaginationHandler(object):
@@ -153,20 +151,19 @@ class PaginationHandler(object):
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
- stream_ordering = (
- yield self.store.find_first_stream_ordering_after_ts(ts)
- )
+ stream_ordering = (yield self.store.find_first_stream_ordering_after_ts(ts))
r = (
yield self.store.get_room_event_after_stream_ordering(
- room_id, stream_ordering,
+ room_id, stream_ordering
)
)
if not r:
logger.warning(
"[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)",
- ts, stream_ordering,
+ ts,
+ stream_ordering,
)
continue
@@ -185,13 +182,10 @@ class PaginationHandler(object):
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
- "_purge_history",
- self._purge_history,
- purge_id, room_id, token, True,
+ "_purge_history", self._purge_history, purge_id, room_id, token, True
)
- def start_purge_history(self, room_id, token,
- delete_local_events=False):
+ def start_purge_history(self, room_id, token, delete_local_events=False):
"""Start off a history purge on a room.
Args:
@@ -206,8 +200,7 @@ class PaginationHandler(object):
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
- 400,
- "History purge already in progress for %s" % (room_id, ),
+ 400, "History purge already in progress for %s" % (room_id,)
)
purge_id = random_string(16)
@@ -218,14 +211,12 @@ class PaginationHandler(object):
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
- self._purge_history,
- purge_id, room_id, token, delete_local_events,
+ self._purge_history, purge_id, room_id, token, delete_local_events
)
return purge_id
@defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token,
- delete_local_events):
+ def _purge_history(self, purge_id, room_id, token, delete_local_events):
"""Carry out a history purge on a room.
Args:
@@ -241,16 +232,13 @@ class PaginationHandler(object):
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, token, delete_local_events,
- )
+ yield self.store.purge_history(room_id, token, delete_local_events)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
f = Failure()
logger.error(
- "[purge] failed",
- exc_info=(f.type, f.value, f.getTracebackObject()),
+ "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
)
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
@@ -259,6 +247,7 @@ class PaginationHandler(object):
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
+
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
@@ -273,8 +262,14 @@ class PaginationHandler(object):
return self._purges_by_id.get(purge_id)
@defer.inlineCallbacks
- def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True, event_filter=None):
+ def get_messages(
+ self,
+ requester,
+ room_id=None,
+ pagin_config=None,
+ as_client_event=True,
+ event_filter=None,
+ ):
"""Get messages in a room.
Args:
@@ -312,7 +307,7 @@ class PaginationHandler(object):
room_id, user_id
)
- if source_config.direction == 'b':
+ if source_config.direction == "b":
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
@@ -346,27 +341,24 @@ class PaginationHandler(object):
event_filter=event_filter,
)
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
+ next_token = pagin_config.from_token.copy_and_replace("room_key", next_key)
if events:
if event_filter:
events = event_filter.filter(events)
events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
+ self.store, user_id, events, is_peeking=(member_event_id is None)
)
if not events:
- defer.returnValue({
- "chunk": [],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- })
+ defer.returnValue(
+ {
+ "chunk": [],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+ )
state = None
if event_filter and event_filter.lazy_load_members() and len(events) > 0:
@@ -374,12 +366,11 @@ class PaginationHandler(object):
# FIXME: we also care about invite targets etc.
state_filter = StateFilter.from_types(
- (EventTypes.Member, event.sender)
- for event in events
+ (EventTypes.Member, event.sender) for event in events
)
state_ids = yield self.store.get_state_ids_for_event(
- events[0].event_id, state_filter=state_filter,
+ events[0].event_id, state_filter=state_filter
)
if state_ids:
@@ -391,8 +382,7 @@ class PaginationHandler(object):
chunk = {
"chunk": (
yield self._event_serializer.serialize_events(
- events, time_now,
- as_client_event=as_client_event,
+ events, time_now, as_client_event=as_client_event
)
),
"start": pagin_config.from_token.to_string(),
@@ -402,8 +392,7 @@ class PaginationHandler(object):
if state:
chunk["state"] = (
yield self._event_serializer.serialize_events(
- state, time_now,
- as_client_event=as_client_event,
+ state, time_now, as_client_event=as_client_event
)
)
|