diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 82 | ||||
-rw-r--r-- | synapse/storage/schema/delta/33/event_fields.py | 60 | ||||
-rw-r--r-- | synapse/storage/stream.py | 56 |
3 files changed, 197 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9d74fd159d..6610549281 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -152,6 +152,7 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" def __init__(self, hs): super(EventsStore, self).__init__(hs) @@ -159,6 +160,10 @@ class EventsStore(SQLBaseStore): self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) + self.register_background_update_handler( + self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, + self._background_reindex_fields_sender, + ) self._event_persist_queue = _EventPeristenceQueue() @@ -576,6 +581,11 @@ class EventsStore(SQLBaseStore): "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), "received_ts": self._clock.time_msec(), + "sender": event.sender, + "contains_url": ( + "url" in event.content + and isinstance(event.content["url"], basestring) + ), } for event, _ in events_and_contexts ], @@ -1116,6 +1126,78 @@ class EventsStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks + def _background_reindex_fields_sender(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + + def reindex_txn(txn): + sql = ( + "SELECT stream_ordering, event_id, json FROM events" + " INNER JOIN event_json USING (event_id)" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + min_stream_id = rows[-1][0] + + update_rows = [] + for row in rows: + try: + event_id = row[1] + event_json = json.loads(row[2]) + sender = event_json["sender"] + content = event_json["content"] + + contains_url = "url" in content + if contains_url: + contains_url &= isinstance(content["url"], basestring) + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + update_rows.append((sender, contains_url, event_id)) + + sql = ( + "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?" + ) + + for index in range(0, len(update_rows), INSERT_CLUMP_SIZE): + clump = update_rows[index:index + INSERT_CLUMP_SIZE] + txn.executemany(sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME) + + defer.returnValue(result) + + @defer.inlineCallbacks def _background_reindex_origin_server_ts(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] diff --git a/synapse/storage/schema/delta/33/event_fields.py b/synapse/storage/schema/delta/33/event_fields.py new file mode 100644 index 0000000000..83066cccc9 --- /dev/null +++ b/synapse/storage/schema/delta/33/event_fields.py @@ -0,0 +1,60 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.prepare_database import get_statements + +import logging +import ujson + +logger = logging.getLogger(__name__) + + +ALTER_TABLE = """ +ALTER TABLE events ADD COLUMN sender TEXT; +ALTER TABLE events ADD COLUMN contains_url BOOLEAN; +""" + + +def run_create(cur, database_engine, *args, **kwargs): + for statement in get_statements(ALTER_TABLE.splitlines()): + cur.execute(statement) + + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] + + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + + sql = ( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)" + ) + + sql = database_engine.convert_param_style(sql) + + cur.execute(sql, ("event_fields_sender_url", progress_json)) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c33ac5a8d7..862c5c3ea1 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -95,6 +95,54 @@ def upper_bound(token, engine, inclusive=True): ) +def filter_to_clause(event_filter): + # NB: This may create SQL clauses that don't optimise well (and we don't + # have indices on all possible clauses). E.g. it may create + # "room_id == X AND room_id != X", which postgres doesn't optimise. + + if not event_filter: + return "", [] + + clauses = [] + args = [] + + if event_filter.types: + clauses.append( + "(%s)" % " OR ".join("type = ?" for _ in event_filter.types) + ) + args.extend(event_filter.types) + + for typ in event_filter.not_types: + clauses.append("type != ?") + args.append(typ) + + if event_filter.senders: + clauses.append( + "(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders) + ) + args.extend(event_filter.senders) + + for sender in event_filter.not_senders: + clauses.append("sender != ?") + args.append(sender) + + if event_filter.rooms: + clauses.append( + "(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms) + ) + args.extend(event_filter.rooms) + + for room_id in event_filter.not_rooms: + clauses.append("room_id != ?") + args.append(room_id) + + if event_filter.contains_url: + clauses.append("contains_url = ?") + args.append(event_filter.contains_url) + + return " AND ".join(clauses), args + + class StreamStore(SQLBaseStore): @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): @@ -320,7 +368,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, - direction='b', limit=-1): + direction='b', limit=-1, event_filter=None): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -344,6 +392,12 @@ class StreamStore(SQLBaseStore): RoomStreamToken.parse(to_key), self.database_engine )) + filter_clause, filter_args = filter_to_clause(event_filter) + + if filter_clause: + bounds += " AND " + filter_clause + args.extend(filter_args) + if int(limit) > 0: args.append(int(limit)) limit_str = " LIMIT ?" |