diff options
author | Mark Haines <mjark@negativecurvature.net> | 2015-05-12 15:20:32 +0100 |
---|---|---|
committer | Mark Haines <mjark@negativecurvature.net> | 2015-05-12 15:20:32 +0100 |
commit | 1fce36b11130eb0356c29a36d7f8fac1e0efce48 (patch) | |
tree | 249b8fc1067a0905ef17194279f2b0fc0940563a /synapse/storage | |
parent | Merge pull request #148 from matrix-org/bugs/SYN-377 (diff) | |
parent | Err, delete the right stuff (diff) | |
download | synapse-1fce36b11130eb0356c29a36d7f8fac1e0efce48.tar.xz |
Merge pull request #149 from matrix-org/erikj/backfill
Backfill support
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_federation.py | 66 | ||||
-rw-r--r-- | synapse/storage/events.py | 15 | ||||
-rw-r--r-- | synapse/storage/stream.py | 138 |
3 files changed, 103 insertions, 116 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 74b4e23590..a1982dfbb5 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -79,6 +79,28 @@ class EventFederationStore(SQLBaseStore): room_id, ) + def get_oldest_events_with_depth_in_room(self, room_id): + return self.runInteraction( + "get_oldest_events_with_depth_in_room", + self.get_oldest_events_with_depth_in_room_txn, + room_id, + ) + + def get_oldest_events_with_depth_in_room_txn(self, txn, room_id): + sql = ( + "SELECT b.event_id, MAX(e.depth) FROM events as e" + " INNER JOIN event_edges as g" + " ON g.event_id = e.event_id AND g.room_id = e.room_id" + " INNER JOIN event_backward_extremities as b" + " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id" + " WHERE b.room_id = ? AND g.is_state is ?" + " GROUP BY b.event_id" + ) + + txn.execute(sql, (room_id, False,)) + + return dict(txn.fetchall()) + def _get_oldest_events_in_room_txn(self, txn, room_id): return self._simple_select_onecol_txn( txn, @@ -247,11 +269,13 @@ class EventFederationStore(SQLBaseStore): do_insert = depth < min_depth if min_depth else True if do_insert: - self._simple_insert_txn( + self._simple_upsert_txn( txn, table="room_depth", - values={ + keyvalues={ "room_id": room_id, + }, + values={ "min_depth": depth, }, ) @@ -306,31 +330,27 @@ class EventFederationStore(SQLBaseStore): txn.execute(query, (event_id, room_id)) - # Insert all the prev_events as a backwards thing, they'll get - # deleted in a second if they're incorrect anyway. - self._simple_insert_many_txn( - txn, - table="event_backward_extremities", - values=[ - { - "event_id": e_id, - "room_id": room_id, - } - for e_id, _ in prev_events - ], + query = ( + "INSERT INTO event_backward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + " )" + " AND NOT EXISTS (" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?" + " )" ) - # Also delete from the backwards extremities table all ones that - # reference events that we have already seen + txn.executemany(query, [ + (e_id, room_id, e_id, room_id, e_id, room_id, ) + for e_id, _ in prev_events + ]) + query = ( - "DELETE FROM event_backward_extremities WHERE EXISTS (" - "SELECT 1 FROM events " - "WHERE " - "event_backward_extremities.event_id = events.event_id " - "AND not events.outlier " - ")" + "DELETE FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" ) - txn.execute(query) + txn.execute(query, (event_id, room_id)) txn.call_after( self.get_latest_event_ids_in_room.invalidate, room_id diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 626a5eaf6e..a5a6869079 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -135,19 +135,17 @@ class EventsStore(SQLBaseStore): outlier = event.internal_metadata.is_outlier() if not outlier: - self._store_state_groups_txn(txn, event, context) - self._update_min_depth_for_room_txn( txn, event.room_id, event.depth ) - have_persisted = self._simple_select_one_onecol_txn( + have_persisted = self._simple_select_one_txn( txn, - table="event_json", + table="events", keyvalues={"event_id": event.event_id}, - retcol="event_id", + retcols=["event_id", "outlier"], allow_none=True, ) @@ -162,7 +160,9 @@ class EventsStore(SQLBaseStore): # if we are persisting an event that we had persisted as an outlier, # but is no longer one. if have_persisted: - if not outlier: + if not outlier and have_persisted["outlier"]: + self._store_state_groups_txn(txn, event, context) + sql = ( "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?" @@ -182,6 +182,9 @@ class EventsStore(SQLBaseStore): ) return + if not outlier: + self._store_state_groups_txn(txn, event, context) + self._handle_prev_events( txn, outlier=outlier, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 280d4ad605..8045e17fd7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,11 +37,9 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.api.constants import EventTypes -from synapse.api.errors import SynapseError +from synapse.types import RoomStreamToken from synapse.util.logutils import log_function -from collections import namedtuple - import logging @@ -55,76 +53,26 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" -class _StreamToken(namedtuple("_StreamToken", "topological stream")): - """Tokens are positions between events. The token "s1" comes after event 1. - - s0 s1 - | | - [0] V [1] V [2] - - Tokens can either be a point in the live event stream or a cursor going - through historic events. - - When traversing the live event stream events are ordered by when they - arrived at the homeserver. - - When traversing historic events the events are ordered by their depth in - the event graph "topological_ordering" and then by when they arrived at the - homeserver "stream_ordering". - - Live tokens start with an "s" followed by the "stream_ordering" id of the - event it comes after. Historic tokens start with a "t" followed by the - "topological_ordering" id of the event it comes after, follewed by "-", - followed by the "stream_ordering" id of the event it comes after. - """ - __slots__ = [] - - @classmethod - def parse(cls, string): - try: - if string[0] == 's': - return cls(topological=None, stream=int(string[1:])) - if string[0] == 't': - parts = string[1:].split('-', 1) - return cls(topological=int(parts[0]), stream=int(parts[1])) - except: - pass - raise SynapseError(400, "Invalid token %r" % (string,)) - - @classmethod - def parse_stream_token(cls, string): - try: - if string[0] == 's': - return cls(topological=None, stream=int(string[1:])) - except: - pass - raise SynapseError(400, "Invalid token %r" % (string,)) - - def __str__(self): - if self.topological is not None: - return "t%d-%d" % (self.topological, self.stream) - else: - return "s%d" % (self.stream,) +def lower_bound(token): + if token.topological is None: + return "(%d < %s)" % (token.stream, "stream_ordering") + else: + return "(%d < %s OR (%d = %s AND %d < %s))" % ( + token.topological, "topological_ordering", + token.topological, "topological_ordering", + token.stream, "stream_ordering", + ) - def lower_bound(self): - if self.topological is None: - return "(%d < %s)" % (self.stream, "stream_ordering") - else: - return "(%d < %s OR (%d = %s AND %d < %s))" % ( - self.topological, "topological_ordering", - self.topological, "topological_ordering", - self.stream, "stream_ordering", - ) - def upper_bound(self): - if self.topological is None: - return "(%d >= %s)" % (self.stream, "stream_ordering") - else: - return "(%d > %s OR (%d = %s AND %d >= %s))" % ( - self.topological, "topological_ordering", - self.topological, "topological_ordering", - self.stream, "stream_ordering", - ) +def upper_bound(token): + if token.topological is None: + return "(%d >= %s)" % (token.stream, "stream_ordering") + else: + return "(%d > %s OR (%d = %s AND %d >= %s))" % ( + token.topological, "topological_ordering", + token.topological, "topological_ordering", + token.stream, "stream_ordering", + ) class StreamStore(SQLBaseStore): @@ -139,8 +87,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_id = _StreamToken.parse_stream_token(from_key) - to_id = _StreamToken.parse_stream_token(to_key) + from_id = RoomStreamToken.parse_stream_token(from_key) + to_id = RoomStreamToken.parse_stream_token(to_key) if from_key == to_key: defer.returnValue(([], to_key)) @@ -234,8 +182,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_id = _StreamToken.parse_stream_token(from_key) - to_id = _StreamToken.parse_stream_token(to_key) + from_id = RoomStreamToken.parse_stream_token(from_key) + to_id = RoomStreamToken.parse_stream_token(to_key) if from_key == to_key: return defer.succeed(([], to_key)) @@ -288,17 +236,17 @@ class StreamStore(SQLBaseStore): args = [False, room_id] if direction == 'b': order = "DESC" - bounds = _StreamToken.parse(from_key).upper_bound() + bounds = upper_bound(RoomStreamToken.parse(from_key)) if to_key: bounds = "%s AND %s" % ( - bounds, _StreamToken.parse(to_key).lower_bound() + bounds, lower_bound(RoomStreamToken.parse(to_key)) ) else: order = "ASC" - bounds = _StreamToken.parse(from_key).lower_bound() + bounds = lower_bound(RoomStreamToken.parse(from_key)) if to_key: bounds = "%s AND %s" % ( - bounds, _StreamToken.parse(to_key).upper_bound() + bounds, upper_bound(RoomStreamToken.parse(to_key)) ) if int(limit) > 0: @@ -333,7 +281,7 @@ class StreamStore(SQLBaseStore): # when we are going backwards so we subtract one from the # stream part. toke -= 1 - next_token = str(_StreamToken(topo, toke)) + next_token = str(RoomStreamToken(topo, toke)) else: # TODO (erikj): We should work out what to do here instead. next_token = to_key if to_key else from_key @@ -354,7 +302,7 @@ class StreamStore(SQLBaseStore): with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback - end_token = _StreamToken.parse_stream_token(end_token) + end_token = RoomStreamToken.parse_stream_token(end_token) if from_token is None: sql = ( @@ -365,7 +313,7 @@ class StreamStore(SQLBaseStore): " LIMIT ?" ) else: - from_token = _StreamToken.parse_stream_token(from_token) + from_token = RoomStreamToken.parse_stream_token(from_token) sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" @@ -395,7 +343,7 @@ class StreamStore(SQLBaseStore): # stream part. topo = rows[0]["topological_ordering"] toke = rows[0]["stream_ordering"] - 1 - start_token = str(_StreamToken(topo, toke)) + start_token = str(RoomStreamToken(topo, toke)) token = (start_token, str(end_token)) else: @@ -416,9 +364,25 @@ class StreamStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_room_events_max_id(self): + def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token(self) - defer.returnValue("s%d" % (token,)) + if direction != 'b': + defer.returnValue("s%d" % (token,)) + else: + topo = yield self.runInteraction( + "_get_max_topological_txn", self._get_max_topological_txn + ) + defer.returnValue("t%d-%d" % (topo, token)) + + def _get_max_topological_txn(self, txn): + txn.execute( + "SELECT MAX(topological_ordering) FROM events" + " WHERE outlier = ?", + (False,) + ) + + rows = txn.fetchall() + return rows[0][0] if rows else 0 @defer.inlineCallbacks def _get_min_token(self): @@ -439,5 +403,5 @@ class StreamStore(SQLBaseStore): stream = row["stream_ordering"] topo = event.depth internal = event.internal_metadata - internal.before = str(_StreamToken(topo, stream - 1)) - internal.after = str(_StreamToken(topo, stream)) + internal.before = str(RoomStreamToken(topo, stream - 1)) + internal.after = str(RoomStreamToken(topo, stream)) |