diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 44 |
1 files changed, 25 insertions, 19 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 529ad4ea79..d9482a3843 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -65,7 +65,7 @@ _EventDictReturn = namedtuple( def generate_pagination_where_clause( - direction, column_names, from_token, to_token, engine, + direction, column_names, from_token, to_token, engine ): """Creates an SQL expression to bound the columns by the pagination tokens. @@ -153,7 +153,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine): str """ - assert(bound in (">", "<", ">=", "<=")) + assert bound in (">", "<", ">=", "<=") name1, name2 = column_names val1, val2 = values @@ -169,11 +169,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine): # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we # use the later form when running against postgres. - return "((%d,%d) %s (%s,%s))" % ( - val1, val2, - bound, - name1, name2, - ) + return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2) # We want to generate queries of e.g. the form: # @@ -276,7 +272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_rooms( - self, room_ids, from_key, to_key, limit=0, order='DESC' + self, room_ids, from_key, to_key, limit=0, order="DESC" ): """Get new room events in stream ordering since `from_key`. @@ -346,7 +342,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_room( - self, room_id, from_key, to_key, limit=0, order='DESC' + self, room_id, from_key, to_key, limit=0, order="DESC" ): """Get new room events in stream ordering since `from_key`. @@ -395,8 +391,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_room_events_stream_for_room", f) - ret = yield self.get_events_as_list([ - r.event_id for r in rows], get_prev_content=True, + ret = yield self.get_events_as_list( + [r.event_id for r in rows], get_prev_content=True ) self._set_before_and_after(ret, rows, topo_order=from_id is None) @@ -446,7 +442,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_membership_changes_for_user", f) ret = yield self.get_events_as_list( - [r.event_id for r in rows], get_prev_content=True, + [r.event_id for r in rows], get_prev_content=True ) self._set_before_and_after(ret, rows, topo_order=False) @@ -592,8 +588,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) def get_max_topological_token(self, room_id, stream_key): + """Get the max topological token in a room before the given stream + ordering. + + Args: + room_id (str) + stream_key (int) + + Returns: + Deferred[int] + """ sql = ( - "SELECT max(topological_ordering) FROM events" + "SELECT coalesce(max(topological_ordering), 0) FROM events" " WHERE room_id = ? AND stream_ordering < ?" ) return self._execute( @@ -715,7 +721,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn, room_id, before_token, - direction='b', + direction="b", limit=before_limit, event_filter=event_filter, ) @@ -725,7 +731,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn, room_id, after_token, - direction='f', + direction="f", limit=after_limit, event_filter=event_filter, ) @@ -806,7 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, from_token, to_token=None, - direction='b', + direction="b", limit=-1, event_filter=None, ): @@ -836,7 +842,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. args = [False, room_id] - if direction == 'b': + if direction == "b": order = "DESC" else: order = "ASC" @@ -872,7 +878,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if rows: topo = rows[-1].topological_ordering toke = rows[-1].stream_ordering - if direction == 'b': + if direction == "b": # Tokens are positions between events. # This token points *after* the last event in the chunk. # We need it to point to the event before it in the chunk @@ -888,7 +894,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def paginate_room_events( - self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None + self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None ): """Returns list of events before or after a given token. |