diff options
author | Richard van der Hoff <richard@matrix.org> | 2016-03-03 11:39:25 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2016-03-03 11:39:25 +0000 |
commit | 79f34bdbc2720a832cdab36ba58e83201e549c41 (patch) | |
tree | 2bdadb2bd2d2dac1a6722c4d33ca81febb7d7ece /synapse/storage/events.py | |
parent | Fix typo (diff) | |
parent | jenkins.sh: set -x (diff) | |
download | synapse-79f34bdbc2720a832cdab36ba58e83201e549c41.tar.xz |
Merge branch 'develop' into rav/SYN-642
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 51 |
1 files changed, 48 insertions, 3 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1dd3236829..60936500d8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -75,8 +75,8 @@ class EventsStore(SQLBaseStore): yield stream_orderings stream_ordering_manager = stream_ordering_manager() else: - stream_ordering_manager = yield self._stream_id_gen.get_next_mult( - self, len(events_and_contexts) + stream_ordering_manager = self._stream_id_gen.get_next_mult( + len(events_and_contexts) ) with stream_ordering_manager as stream_orderings: @@ -109,7 +109,7 @@ class EventsStore(SQLBaseStore): stream_ordering = self.min_stream_token if stream_ordering is None: - stream_ordering_manager = yield self._stream_id_gen.get_next(self) + stream_ordering_manager = self._stream_id_gen.get_next() else: @contextmanager def stream_ordering_manager(): @@ -1064,3 +1064,48 @@ class EventsStore(SQLBaseStore): yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME) defer.returnValue(result) + + def get_current_backfill_token(self): + """The current minimum token that backfilled events have reached""" + + # TODO: Fix race with the persit_event txn by using one of the + # stream id managers + return -self.min_stream_token + + def get_all_new_events(self, last_backfill_id, last_forward_id, + current_backfill_id, current_forward_id, limit): + """Get all the new events that have arrived at the server either as + new events or as backfilled events""" + def get_all_new_events_txn(txn): + sql = ( + "SELECT e.stream_ordering, ej.internal_metadata, ej.json" + " FROM events as e" + " JOIN event_json as ej" + " ON e.event_id = ej.event_id AND e.room_id = ej.room_id" + " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + if last_forward_id != current_forward_id: + txn.execute(sql, (last_forward_id, current_forward_id, limit)) + new_forward_events = txn.fetchall() + else: + new_forward_events = [] + + sql = ( + "SELECT -e.stream_ordering, ej.internal_metadata, ej.json" + " FROM events as e" + " JOIN event_json as ej" + " ON e.event_id = ej.event_id AND e.room_id = ej.room_id" + " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?" + " ORDER BY e.stream_ordering DESC" + " LIMIT ?" + ) + if last_backfill_id != current_backfill_id: + txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit)) + new_backfill_events = txn.fetchall() + else: + new_backfill_events = [] + + return (new_forward_events, new_backfill_events) + return self.runInteraction("get_all_new_events", get_all_new_events_txn) |