diff options
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 42 |
1 files changed, 25 insertions, 17 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index e3b9ff5ca6..91f8abb67d 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -565,7 +565,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): AND e.stream_ordering > ? AND e.stream_ordering <= ? ORDER BY e.stream_ordering ASC """ - txn.execute(sql, (user_id, min_from_id, max_to_id,)) + txn.execute( + sql, + ( + user_id, + min_from_id, + max_to_id, + ), + ) rows = [ _EventDictReturn(event_id, None, stream_ordering) @@ -695,7 +702,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): return "t%d-%d" % (topo, token) def get_stream_id_for_event_txn( - self, txn: LoggingTransaction, event_id: str, allow_none=False, + self, + txn: LoggingTransaction, + event_id: str, + allow_none=False, ) -> int: return self.db_pool.simple_select_one_onecol_txn( txn=txn, @@ -706,8 +716,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): ) async def get_position_for_event(self, event_id: str) -> PersistedEventPosition: - """Get the persisted position for an event - """ + """Get the persisted position for an event""" row = await self.db_pool.simple_select_one( table="events", keyvalues={"event_id": event_id}, @@ -897,19 +906,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): ) -> Tuple[int, List[EventBase]]: """Get all new events - Returns all events with from_id < stream_ordering <= current_id. + Returns all events with from_id < stream_ordering <= current_id. - Args: - from_id: the stream_ordering of the last event we processed - current_id: the stream_ordering of the most recently processed event - limit: the maximum number of events to return + Args: + from_id: the stream_ordering of the last event we processed + current_id: the stream_ordering of the most recently processed event + limit: the maximum number of events to return - Returns: - A tuple of (next_id, events), where `next_id` is the next value to - pass as `from_id` (it will either be the stream_ordering of the - last returned event, or, if fewer than `limit` events were found, - the `current_id`). - """ + Returns: + A tuple of (next_id, events), where `next_id` is the next value to + pass as `from_id` (it will either be the stream_ordering of the + last returned event, or, if fewer than `limit` events were found, + the `current_id`). + """ def get_all_new_events_stream_txn(txn): sql = ( @@ -1238,8 +1247,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): @cached() async def get_id_for_instance(self, instance_name: str) -> int: - """Get a unique, immutable ID that corresponds to the given Synapse worker instance. - """ + """Get a unique, immutable ID that corresponds to the given Synapse worker instance.""" def _get_id_for_instance_txn(txn): instance_id = self.db_pool.simple_select_one_onecol_txn( |