summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py42
1 files changed, 25 insertions, 17 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e3b9ff5ca6..91f8abb67d 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -565,7 +565,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
                     AND e.stream_ordering > ? AND e.stream_ordering <= ?
                 ORDER BY e.stream_ordering ASC
             """
-            txn.execute(sql, (user_id, min_from_id, max_to_id,))
+            txn.execute(
+                sql,
+                (
+                    user_id,
+                    min_from_id,
+                    max_to_id,
+                ),
+            )
 
             rows = [
                 _EventDictReturn(event_id, None, stream_ordering)
@@ -695,7 +702,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
             return "t%d-%d" % (topo, token)
 
     def get_stream_id_for_event_txn(
-        self, txn: LoggingTransaction, event_id: str, allow_none=False,
+        self,
+        txn: LoggingTransaction,
+        event_id: str,
+        allow_none=False,
     ) -> int:
         return self.db_pool.simple_select_one_onecol_txn(
             txn=txn,
@@ -706,8 +716,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
         )
 
     async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
-        """Get the persisted position for an event
-        """
+        """Get the persisted position for an event"""
         row = await self.db_pool.simple_select_one(
             table="events",
             keyvalues={"event_id": event_id},
@@ -897,19 +906,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
     ) -> Tuple[int, List[EventBase]]:
         """Get all new events
 
-         Returns all events with from_id < stream_ordering <= current_id.
+        Returns all events with from_id < stream_ordering <= current_id.
 
-         Args:
-             from_id:  the stream_ordering of the last event we processed
-             current_id:  the stream_ordering of the most recently processed event
-             limit: the maximum number of events to return
+        Args:
+            from_id:  the stream_ordering of the last event we processed
+            current_id:  the stream_ordering of the most recently processed event
+            limit: the maximum number of events to return
 
-         Returns:
-             A tuple of (next_id, events), where `next_id` is the next value to
-             pass as `from_id` (it will either be the stream_ordering of the
-             last returned event, or, if fewer than `limit` events were found,
-             the `current_id`).
-         """
+        Returns:
+            A tuple of (next_id, events), where `next_id` is the next value to
+            pass as `from_id` (it will either be the stream_ordering of the
+            last returned event, or, if fewer than `limit` events were found,
+            the `current_id`).
+        """
 
         def get_all_new_events_stream_txn(txn):
             sql = (
@@ -1238,8 +1247,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
     @cached()
     async def get_id_for_instance(self, instance_name: str) -> int:
-        """Get a unique, immutable ID that corresponds to the given Synapse worker instance.
-        """
+        """Get a unique, immutable ID that corresponds to the given Synapse worker instance."""
 
         def _get_id_for_instance_txn(txn):
             instance_id = self.db_pool.simple_select_one_onecol_txn(