diff options
-rw-r--r-- | synapse/storage/stream.py | 108 | ||||
-rw-r--r-- | synapse/types.py | 19 |
2 files changed, 84 insertions, 43 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index fb463c525a..387120090a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -62,24 +62,25 @@ _TOPOLOGICAL_TOKEN = "topological" # Used as return values for pagination APIs _EventDictReturn = namedtuple("_EventDictReturn", ( - "event_id", "topological_ordering", "stream_ordering", + "event_id", "chunk_id", "topological_ordering", "stream_ordering", )) def lower_bound(token, engine, inclusive=False): inclusive = "=" if inclusive else "" - if token.topological is None: + if token.chunk is None: return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering") else: if isinstance(engine, PostgresEngine): # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we # use the later form when running against postgres. - return "((%d,%d) <%s (%s,%s))" % ( - token.topological, token.stream, inclusive, + return "(chunk_id = %d AND (%d,%d) <%s (%s,%s))" % ( + token.chunk, token.topological, token.stream, inclusive, "topological_ordering", "stream_ordering", ) - return "(%d < %s OR (%d = %s AND %d <%s %s))" % ( + return "(chunk_id = %d AND (%d < %s OR (%d = %s AND %d <%s %s)))" % ( + token.chunk, token.topological, "topological_ordering", token.topological, "topological_ordering", token.stream, inclusive, "stream_ordering", @@ -88,18 +89,19 @@ def lower_bound(token, engine, inclusive=False): def upper_bound(token, engine, inclusive=True): inclusive = "=" if inclusive else "" - if token.topological is None: + if token.chunk is None: return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering") else: if isinstance(engine, PostgresEngine): # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we # use the later form when running against postgres. - return "((%d,%d) >%s (%s,%s))" % ( - token.topological, token.stream, inclusive, + return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % ( + token.chunk, token.topological, token.stream, inclusive, "topological_ordering", "stream_ordering", ) - return "(%d > %s OR (%d = %s AND %d >%s %s))" % ( + return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % ( + token.chunk, token.topological, "topological_ordering", token.topological, "topological_ordering", token.stream, inclusive, "stream_ordering", @@ -275,7 +277,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) % (order,) txn.execute(sql, (room_id, from_id, to_id, limit)) - rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] + rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn] return rows rows = yield self.runInteraction("get_room_events_stream_for_room", f) @@ -325,7 +327,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) txn.execute(sql, (user_id, from_id, to_id,)) - rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] + rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn] return rows @@ -437,15 +439,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): `room_id` causes it to return the current room specific topological token. """ - token = yield self.get_room_max_stream_ordering() if room_id is None: - defer.returnValue("s%d" % (token,)) + token = yield self.get_room_max_stream_ordering() + defer.returnValue(str(RoomStreamToken(None, None, token))) else: - topo = yield self.runInteraction( - "_get_max_topological_txn", self._get_max_topological_txn, + token = yield self.runInteraction( + "get_room_events_max_id", self._get_topological_token_for_room_txn, room_id, ) - defer.returnValue("t%d-%d" % (topo, token)) + if not token: + raise Exception("Server not in room") + defer.returnValue(str(token)) def get_stream_token_for_event(self, event_id): """The stream token for an event @@ -460,7 +464,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering", - ).addCallback(lambda row: "s%d" % (row,)) + ).addCallback(lambda row: str(RoomStreamToken(None, None, row))) def get_topological_token_for_event(self, event_id): """The stream token for an event @@ -469,16 +473,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Raises: StoreError if the event wasn't in the database. Returns: - A deferred "t%d-%d" topological token. + A deferred topological token. """ return self._simple_select_one( table="events", keyvalues={"event_id": event_id}, - retcols=("stream_ordering", "topological_ordering"), + retcols=("stream_ordering", "topological_ordering", "chunk_id"), desc="get_topological_token_for_event", - ).addCallback(lambda row: "t%d-%d" % ( - row["topological_ordering"], row["stream_ordering"],) - ) + ).addCallback(lambda row: str(RoomStreamToken( + row["chunk_id"], + row["topological_ordering"], + row["stream_ordering"], + ))) + + def _get_topological_token_for_room_txn(self, txn, room_id): + sql = """ + SELECT chunk_id, topological_ordering, stream_ordering + FROM events + NATURAL JOIN event_forward_extremities + WHERE room_id = ? + ORDER BY stream_ordering DESC + LIMIT 1 + """ + txn.execute(sql, (room_id,)) + row = txn.fetchone() + if row: + c, t, s = row + return RoomStreamToken(c, t, s) + return None def get_max_topological_token(self, room_id, stream_key): sql = ( @@ -515,18 +537,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): null topological_ordering. """ for event, row in zip(events, rows): + chunk = row.chunk_id + topo = row.topological_ordering stream = row.stream_ordering - if topo_order and row.topological_ordering: - topo = row.topological_ordering - else: - topo = None + internal = event.internal_metadata - internal.before = str(RoomStreamToken(topo, stream - 1)) - internal.after = str(RoomStreamToken(topo, stream)) - internal.order = ( - int(topo) if topo else 0, - int(stream), - ) + if topo_order and chunk: + internal.before = str(RoomStreamToken(chunk, topo, stream - 1)) + internal.after = str(RoomStreamToken(chunk, topo, stream)) + internal.order = ( + int(chunk) if chunk else 0, + int(topo) if topo else 0, + int(stream), + ) + else: + internal.before = str(RoomStreamToken(None, None, stream - 1)) + internal.after = str(RoomStreamToken(None, None, stream)) + internal.order = ( + 0, 0, int(stream), + ) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): @@ -586,17 +615,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "event_id": event_id, "room_id": room_id, }, - retcols=["stream_ordering", "topological_ordering"], + retcols=["stream_ordering", "topological_ordering", "chunk_id"], ) # Paginating backwards includes the event at the token, but paginating # forward doesn't. before_token = RoomStreamToken( - results["topological_ordering"] - 1, - results["stream_ordering"], + results["chunk_id"], + results["topological_ordering"], + results["stream_ordering"] - 1, ) after_token = RoomStreamToken( + results["chunk_id"], results["topological_ordering"], results["stream_ordering"], ) @@ -728,7 +759,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): args.append(int(limit)) sql = ( - "SELECT event_id, topological_ordering, stream_ordering" + "SELECT event_id, chunk_id, topological_ordering, stream_ordering" " FROM events" " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," @@ -743,6 +774,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] if rows: + chunk = rows[-1].chunk_id topo = rows[-1].topological_ordering toke = rows[-1].stream_ordering if direction == 'b': @@ -752,12 +784,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # when we are going backwards so we subtract one from the # stream part. toke -= 1 - next_token = RoomStreamToken(topo, toke) + next_token = RoomStreamToken(chunk, topo, toke) else: # TODO (erikj): We should work out what to do here instead. next_token = to_token if to_token else from_token - return rows, str(next_token), + return rows, str(next_token), iterated_chunks, @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, diff --git a/synapse/types.py b/synapse/types.py index cc7c182a78..27f204ce79 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -306,7 +306,7 @@ StreamToken.START = StreamToken( ) -class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): +class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")): """Tokens are positions between events. The token "s1" comes after event 1. s0 s1 @@ -334,10 +334,17 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): def parse(cls, string): try: if string[0] == 's': - return cls(topological=None, stream=int(string[1:])) - if string[0] == 't': + return cls(chunk=None, topological=None, stream=int(string[1:])) + if string[0] == 't': # For backwards compat with older tokens. parts = string[1:].split('-', 1) - return cls(topological=int(parts[0]), stream=int(parts[1])) + return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1])) + if string[0] == 'c': + parts = string[1:].split('~', 2) + return cls( + chunk=int(parts[0]), + topological=int(parts[1]), + stream=int(parts[2]), + ) except Exception: pass raise SynapseError(400, "Invalid token %r" % (string,)) @@ -346,12 +353,14 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): def parse_stream_token(cls, string): try: if string[0] == 's': - return cls(topological=None, stream=int(string[1:])) + return cls(chunk=None, topological=None, stream=int(string[1:])) except Exception: pass raise SynapseError(400, "Invalid token %r" % (string,)) def __str__(self): + if self.chunk is not None: + return "c%d~%d~%d" % (self.chunk, self.topological, self.stream) if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) else: |