diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e3b9ff5ca6..91f8abb67d 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -565,7 +565,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
AND e.stream_ordering > ? AND e.stream_ordering <= ?
ORDER BY e.stream_ordering ASC
"""
- txn.execute(sql, (user_id, min_from_id, max_to_id,))
+ txn.execute(
+ sql,
+ (
+ user_id,
+ min_from_id,
+ max_to_id,
+ ),
+ )
rows = [
_EventDictReturn(event_id, None, stream_ordering)
@@ -695,7 +702,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
return "t%d-%d" % (topo, token)
def get_stream_id_for_event_txn(
- self, txn: LoggingTransaction, event_id: str, allow_none=False,
+ self,
+ txn: LoggingTransaction,
+ event_id: str,
+ allow_none=False,
) -> int:
return self.db_pool.simple_select_one_onecol_txn(
txn=txn,
@@ -706,8 +716,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
)
async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
- """Get the persisted position for an event
- """
+ """Get the persisted position for an event"""
row = await self.db_pool.simple_select_one(
table="events",
keyvalues={"event_id": event_id},
@@ -897,19 +906,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
) -> Tuple[int, List[EventBase]]:
"""Get all new events
- Returns all events with from_id < stream_ordering <= current_id.
+ Returns all events with from_id < stream_ordering <= current_id.
- Args:
- from_id: the stream_ordering of the last event we processed
- current_id: the stream_ordering of the most recently processed event
- limit: the maximum number of events to return
+ Args:
+ from_id: the stream_ordering of the last event we processed
+ current_id: the stream_ordering of the most recently processed event
+ limit: the maximum number of events to return
- Returns:
- A tuple of (next_id, events), where `next_id` is the next value to
- pass as `from_id` (it will either be the stream_ordering of the
- last returned event, or, if fewer than `limit` events were found,
- the `current_id`).
- """
+ Returns:
+ A tuple of (next_id, events), where `next_id` is the next value to
+ pass as `from_id` (it will either be the stream_ordering of the
+ last returned event, or, if fewer than `limit` events were found,
+ the `current_id`).
+ """
def get_all_new_events_stream_txn(txn):
sql = (
@@ -1238,8 +1247,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
@cached()
async def get_id_for_instance(self, instance_name: str) -> int:
- """Get a unique, immutable ID that corresponds to the given Synapse worker instance.
- """
+ """Get a unique, immutable ID that corresponds to the given Synapse worker instance."""
def _get_id_for_instance_txn(txn):
instance_id = self.db_pool.simple_select_one_onecol_txn(
|