diff options
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r-- | synapse/storage/stream.py | 151 |
1 files changed, 73 insertions, 78 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3405cb365e..744c821dfe 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2014, 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -137,7 +137,6 @@ class StreamStore(SQLBaseStore): with_feedback=with_feedback, ) - @defer.inlineCallbacks @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): @@ -157,11 +156,6 @@ class StreamStore(SQLBaseStore): "WHERE m.user_id = ? " ) - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - if limit: limit = max(limit, MAX_STREAM_SIZE) else: @@ -172,38 +166,42 @@ class StreamStore(SQLBaseStore): to_id = _parse_stream_token(to_key) if from_key == to_key: - defer.returnValue(([], to_key)) - return + return defer.succeed(([], to_key)) sql = ( - "SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE " + "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { - "redacted": del_sql, "current": current_room_membership_sql, "invites": membership_sql, "limit": limit } - rows = yield self._execute_and_decode( - sql, - user_id, user_id, from_id, to_id - ) + def f(txn): + txn.execute(sql, (user_id, user_id, from_id, to_id,)) - ret = yield self._parse_events(rows) + rows = self.cursor_to_dict(txn) - if rows: - key = "s%d" % max([r["stream_ordering"] for r in rows]) - else: - # Assume we didn't get anything because there was nothing to get. - key = to_key + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + return ret, key - defer.returnValue((ret, key)) + return self.runInteraction("get_room_events_stream", f) - @defer.inlineCallbacks @log_function def paginate_room_events(self, room_id, from_key, to_key=None, direction='b', limit=-1, @@ -221,7 +219,9 @@ class StreamStore(SQLBaseStore): bounds = _get_token_bound(from_key, from_comp) if to_key: - bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp)) + bounds = "%s AND %s" % ( + bounds, _get_token_bound(to_key, to_comp) + ) if int(limit) > 0: args.append(int(limit)) @@ -229,87 +229,82 @@ class StreamStore(SQLBaseStore): else: limit_str = "" - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = events.event_id " - "LIMIT 1" - ) - sql = ( - "SELECT *, (%(redacted)s) AS redacted FROM events" + "SELECT * FROM events" " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" ) % { - "redacted": del_sql, "bounds": bounds, "order": order, "limit": limit_str } - rows = yield self._execute_and_decode( - sql, - *args - ) - - if rows: - topo = rows[-1]["topological_ordering"] - toke = rows[-1]["stream_ordering"] - if direction == 'b': - topo -= 1 - toke -= 1 - next_token = "t%s-%s" % (topo, toke) - else: - # TODO (erikj): We should work out what to do here instead. - next_token = to_key if to_key else from_key + def f(txn): + txn.execute(sql, args) + + rows = self.cursor_to_dict(txn) + + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + if direction == 'b': + topo -= 1 + toke -= 1 + next_token = "t%s-%s" % (topo, toke) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + events = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) - events = yield self._parse_events(rows) + return events, next_token, - defer.returnValue( - ( - events, - next_token - ) - ) + return self.runInteraction("paginate_room_events", f) - @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, with_feedback=False): # TODO (erikj): Handle compressed feedback - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = events.event_id " - "LIMIT 1" - ) - sql = ( - "SELECT *, (%(redacted)s) AS redacted FROM events " + "SELECT stream_ordering, topological_ordering, event_id FROM events " "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) % { - "redacted": del_sql, - } - - rows = yield self._execute_and_decode( - sql, - room_id, end_token, limit ) - rows.reverse() # As we selected with reverse ordering + def f(txn): + txn.execute(sql, (room_id, end_token, limit,)) - if rows: - topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] - start_token = "t%s-%s" % (topo, toke) + rows = self.cursor_to_dict(txn) - token = (start_token, end_token) - else: - token = (end_token, end_token) + rows.reverse() # As we selected with reverse ordering - events = yield self._parse_events(rows) + if rows: + # XXX: Always subtract 1 since the start token always goes + # backwards (parity with paginate_room_events). It isn't + # obvious that this is correct; we should clarify the algorithm + # used here. + topo = rows[0]["topological_ordering"] - 1 + toke = rows[0]["stream_ordering"] - 1 + start_token = "t%s-%s" % (topo, toke) + + token = (start_token, end_token) + else: + token = (end_token, end_token) + + events = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) - ret = (events, token) + return events, token - defer.returnValue(ret) + return self.runInteraction("get_recent_events_for_room", f) def get_room_events_max_id(self): return self.runInteraction( |