diff options
Diffstat (limited to 'synapse/storage')
26 files changed, 1246 insertions, 140 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6c32773f25..9996f195a0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -111,7 +111,10 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "presence_stream", "stream_id" ) self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" + db_conn, "device_max_stream_id", "stream_id" + ) + self._public_room_id_gen = StreamIdGenerator( + db_conn, "public_room_list_stream", "stream_id" ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") @@ -182,6 +185,30 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + db_conn, "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + db_conn, "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", @@ -195,6 +222,8 @@ class DataStore(RoomMemberStore, RoomStore, self._find_stream_orderings_for_times, 60 * 60 * 1000 ) + self._stream_order_on_start = self.get_room_max_stream_ordering() + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 30d0e4c5dc..94b2bcc54a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -133,10 +133,12 @@ class BackgroundUpdateStore(SQLBaseStore): updates = yield self._simple_select_list( "background_updates", keyvalues=None, - retcols=("update_name",), + retcols=("update_name", "depends_on"), ) + in_flight = set(update["update_name"] for update in updates) for update in updates: - self._background_update_queue.append(update['update_name']) + if update["depends_on"] not in in_flight: + self._background_update_queue.append(update['update_name']) if not self._background_update_queue: # no work left to do @@ -217,7 +219,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers[update_name] = update_handler def register_background_index_update(self, update_name, index_name, - table, columns): + table, columns, where_clause=None): """Helper for store classes to do a background index addition To use: @@ -241,14 +243,20 @@ class BackgroundUpdateStore(SQLBaseStore): conc = True else: conc = False - - sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ - % { - "conc": "CONCURRENTLY" if conc else "", - "name": index_name, - "table": table, - "columns": ", ".join(columns), - } + # We don't use partial indices on SQLite as it wasn't introduced + # until 3.8, and wheezy has 3.7 + where_clause = None + + sql = ( + "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" + " %(where_clause)s" + ) % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + "where_clause": "WHERE " + where_clause if where_clause else "" + } def create_index_concurrently(conn): conn.rollback() diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 68116b0394..f640e73714 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -27,63 +27,172 @@ logger = logging.getLogger(__name__) class DeviceInboxStore(SQLBaseStore): @defer.inlineCallbacks - def add_messages_to_device_inbox(self, messages_by_user_then_device): - """ + def add_messages_to_device_inbox(self, local_messages_by_user_then_device, + remote_messages_by_destination): + """Used to send messages from this server. + Args: - messages_by_user_and_device(dict): + sender_user_id(str): The ID of the user sending these messages. + local_messages_by_user_and_device(dict): Dictionary of user_id to device_id to message. + remote_messages_by_destination(dict): + Dictionary of destination server_name to the EDU JSON to send. Returns: A deferred stream_id that resolves when the messages have been inserted. """ - def select_devices_txn(txn, user_id, devices): - if not devices: - return [] - sql = ( - "SELECT user_id, device_id FROM devices" - " WHERE user_id = ? AND device_id IN (" - + ",".join("?" * len(devices)) - + ")" + def add_messages_txn(txn, now_ms, stream_id): + # Add the local messages directly to the local inbox. + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device ) - # TODO: Maybe this needs to be done in batches if there are - # too many local devices for a given user. - args = [user_id] + devices - txn.execute(sql, args) - return [tuple(row) for row in txn.fetchall()] - - def add_messages_to_device_inbox_txn(txn, stream_id): - local_users_and_devices = set() - for user_id, messages_by_device in messages_by_user_then_device.items(): - local_users_and_devices.update( - select_devices_txn(txn, user_id, messages_by_device.keys()) - ) + # Add the remote messages to the federation outbox. + # We'll send them to a remote server when we next send a + # federation transaction to that destination. sql = ( - "INSERT INTO device_inbox" - " (user_id, device_id, stream_id, message_json)" + "INSERT INTO device_federation_outbox" + " (destination, stream_id, queued_ts, messages_json)" " VALUES (?,?,?,?)" ) rows = [] - for user_id, messages_by_device in messages_by_user_then_device.items(): - for device_id, message in messages_by_device.items(): - message_json = ujson.dumps(message) - # Only insert into the local inbox if the device exists on - # this server - if (user_id, device_id) in local_users_and_devices: - rows.append((user_id, device_id, stream_id, message_json)) - + for destination, edu in remote_messages_by_destination.items(): + edu_json = ujson.dumps(edu) + rows.append((destination, stream_id, now_ms, edu_json)) txn.executemany(sql, rows) with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_msec() yield self.runInteraction( "add_messages_to_device_inbox", - add_messages_to_device_inbox_txn, - stream_id + add_messages_txn, + now_ms, + stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + for destination in remote_messages_by_destination.keys(): + self._device_federation_outbox_stream_cache.entity_has_changed( + destination, stream_id + ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) + @defer.inlineCallbacks + def add_messages_from_remote_to_device_inbox( + self, origin, message_id, local_messages_by_user_then_device + ): + def add_messages_txn(txn, now_ms, stream_id): + # Check if we've already inserted a matching message_id for that + # origin. This can happen if the origin doesn't receive our + # acknowledgement from the first time we received the message. + already_inserted = self._simple_select_one_txn( + txn, table="device_federation_inbox", + keyvalues={"origin": origin, "message_id": message_id}, + retcols=("message_id",), + allow_none=True, + ) + if already_inserted is not None: + return + + # Add an entry for this message_id so that we know we've processed + # it. + self._simple_insert_txn( + txn, table="device_federation_inbox", + values={ + "origin": origin, + "message_id": message_id, + "received_ts": now_ms, + }, + ) + + # Add the messages to the approriate local device inboxes so that + # they'll be sent to the devices when they next sync. + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + + with self._device_inbox_id_gen.get_next() as stream_id: + now_ms = self.clock.time_msec() + yield self.runInteraction( + "add_messages_from_remote_to_device_inbox", + add_messages_txn, + now_ms, + stream_id, + ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + + defer.returnValue(stream_id) + + def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, + messages_by_user_then_device): + sql = ( + "UPDATE device_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(sql, (stream_id, stream_id)) + + local_by_user_then_device = {} + for user_id, messages_by_device in messages_by_user_then_device.items(): + messages_json_for_user = {} + devices = messages_by_device.keys() + if len(devices) == 1 and devices[0] == "*": + # Handle wildcard device_ids. + sql = ( + "SELECT device_id FROM devices" + " WHERE user_id = ?" + ) + txn.execute(sql, (user_id,)) + message_json = ujson.dumps(messages_by_device["*"]) + for row in txn.fetchall(): + # Add the message for all devices for this user on this + # server. + device = row[0] + messages_json_for_user[device] = message_json + else: + if not devices: + continue + sql = ( + "SELECT device_id FROM devices" + " WHERE user_id = ? AND device_id IN (" + + ",".join("?" * len(devices)) + + ")" + ) + # TODO: Maybe this needs to be done in batches if there are + # too many local devices for a given user. + txn.execute(sql, [user_id] + devices) + for row in txn.fetchall(): + # Only insert into the local inbox if the device exists on + # this server + device = row[0] + message_json = ujson.dumps(messages_by_device[device]) + messages_json_for_user[device] = message_json + + if messages_json_for_user: + local_by_user_then_device[user_id] = messages_json_for_user + + if not local_by_user_then_device: + return + + sql = ( + "INSERT INTO device_inbox" + " (user_id, device_id, stream_id, message_json)" + " VALUES (?,?,?,?)" + ) + rows = [] + for user_id, messages_by_device in local_by_user_then_device.items(): + for device_id, message_json in messages_by_device.items(): + rows.append((user_id, device_id, stream_id, message_json)) + + txn.executemany(sql, rows) + def get_new_messages_for_device( self, user_id, device_id, last_stream_id, current_stream_id, limit=100 ): @@ -97,6 +206,12 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_device_txn(txn): sql = ( "SELECT stream_id, message_json FROM device_inbox" @@ -182,3 +297,71 @@ class DeviceInboxStore(SQLBaseStore): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() + + def get_new_device_msgs_for_remote( + self, destination, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + destination(str): The name of the remote server. + last_stream_id(int): The last position of the device message stream + that the server sent up to. + current_stream_id(int): The current position of the device + message stream. + Returns: + Deferred ([dict], int): List of messages for the device and where + in the stream the messages got to. + """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed or last_stream_id == current_stream_id: + return defer.succeed(([], current_stream_id)) + + def get_new_messages_for_remote_destination_txn(txn): + sql = ( + "SELECT stream_id, messages_json FROM device_federation_outbox" + " WHERE destination = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + destination, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn.fetchall(): + stream_pos = row[0] + messages.append(ujson.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_device_msgs_for_remote", + get_new_messages_for_remote_destination_txn, + ) + + def delete_device_msgs_for_remote(self, destination, up_to_stream_id): + """Used to delete messages when the remote destination acknowledges + their receipt. + + Args: + destination(str): The destination server_name + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves when the messages have been deleted. + """ + def delete_messages_for_remote_destination_txn(txn): + sql = ( + "DELETE FROM device_federation_outbox" + " WHERE destination = ?" + " AND stream_id <= ?" + ) + txn.execute(sql, (destination, up_to_stream_id)) + + return self.runInteraction( + "delete_device_msgs_for_remote", + delete_messages_for_remote_destination_txn + ) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index afd6530cab..17920d4480 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -54,8 +54,12 @@ class DeviceStore(SQLBaseStore): or_ignore=ignore_if_known, ) except Exception as e: - logger.error("store_device with device_id=%s failed: %s", - device_id, e) + logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" + " display_name=%s(%r) failed: %s", + type(device_id).__name__, device_id, + type(user_id).__name__, user_id, + type(initial_device_display_name).__name__, + initial_device_display_name, e) raise StoreError(500, "Problem storing device.") def get_device(self, user_id, device_id): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0827946207..3d62451de9 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -16,6 +16,7 @@ from twisted.internet import defer from ._base import SQLBaseStore +from synapse.api.errors import StoreError from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 @@ -36,6 +37,13 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ + def __init__(self, hs): + super(EventFederationStore, self).__init__(hs) + + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) + def get_auth_chain(self, event_ids): return self.get_auth_chain_ids(event_ids).addCallback(self._get_events) @@ -270,6 +278,37 @@ class EventFederationStore(SQLBaseStore): ] ) + # We now insert into stream_ordering_to_exterm a mapping from room_id, + # new stream_ordering to new forward extremeties in the room. + # This allows us to later efficiently look up the forward extremeties + # for a room before a given stream_ordering + max_stream_ord = max( + ev.internal_metadata.stream_ordering for ev in events + ) + new_extrem = {} + for room_id in events_by_room: + event_ids = self._simple_select_onecol_txn( + txn, + table="event_forward_extremities", + keyvalues={"room_id": room_id}, + retcol="event_id", + ) + new_extrem[room_id] = event_ids + + self._simple_insert_many_txn( + txn, + table="stream_ordering_to_exterm", + values=[ + { + "room_id": room_id, + "event_id": event_id, + "stream_ordering": max_stream_ord, + } + for room_id, extrem_evs in new_extrem.items() + for event_id in extrem_evs + ] + ) + query = ( "INSERT INTO event_backward_extremities (event_id, room_id)" " SELECT ?, ? WHERE NOT EXISTS (" @@ -305,6 +344,76 @@ class EventFederationStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate, (room_id,) ) + def get_forward_extremeties_for_room(self, room_id, stream_ordering): + # We want to make the cache more effective, so we clamp to the last + # change before the given ordering. + last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) + + # We don't always have a full stream_to_exterm_id table, e.g. after + # the upgrade that introduced it, so we make sure we never ask for a + # try and pin to a stream_ordering from before a restart + last_change = max(self._stream_order_on_start, last_change) + + if last_change > self.stream_ordering_month_ago: + stream_ordering = min(last_change, stream_ordering) + + return self._get_forward_extremeties_for_room(room_id, stream_ordering) + + @cached(max_entries=5000, num_args=2) + def _get_forward_extremeties_for_room(self, room_id, stream_ordering): + """For a given room_id and stream_ordering, return the forward + extremeties of the room at that point in "time". + + Throws a StoreError if we have since purged the index for + stream_orderings from that point. + """ + + if stream_ordering <= self.stream_ordering_month_ago: + raise StoreError(400, "stream_ordering too old") + + sql = (""" + SELECT event_id FROM stream_ordering_to_exterm + INNER JOIN ( + SELECT room_id, MAX(stream_ordering) AS stream_ordering + FROM stream_ordering_to_exterm + WHERE stream_ordering <= ? GROUP BY room_id + ) AS rms USING (room_id, stream_ordering) + WHERE room_id = ? + """) + + def get_forward_extremeties_for_room_txn(txn): + txn.execute(sql, (stream_ordering, room_id)) + rows = txn.fetchall() + return [event_id for event_id, in rows] + + return self.runInteraction( + "get_forward_extremeties_for_room", + get_forward_extremeties_for_room_txn + ) + + def _delete_old_forward_extrem_cache(self): + def _delete_old_forward_extrem_cache_txn(txn): + # Delete entries older than a month, while making sure we don't delete + # the only entries for a room. + sql = (""" + DELETE FROM stream_ordering_to_exterm + WHERE + ( + SELECT max(stream_ordering) AS stream_ordering + FROM stream_ordering_to_exterm + WHERE room_id = stream_ordering_to_exterm.room_id + ) > ? + AND stream_ordering < ? + """) + txn.execute( + sql, + (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) + ) + return self.runInteraction( + "_delete_old_forward_extrem_cache", + _delete_old_forward_extrem_cache_txn + ) + def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index eb15fb751b..9cd923eb93 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -26,10 +26,19 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): + EPA_HIGHLIGHT_INDEX = "epa_highlight_index" + def __init__(self, hs): self.stream_ordering_month_ago = None super(EventPushActionsStore, self).__init__(hs) + self.register_background_index_update( + self.EPA_HIGHLIGHT_INDEX, + index_name="event_push_actions_u_highlight", + table="event_push_actions", + columns=["user_id", "stream_ordering"], + ) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -338,20 +347,29 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(notifs[:limit]) @defer.inlineCallbacks - def get_push_actions_for_user(self, user_id, before=None, limit=50): + def get_push_actions_for_user(self, user_id, before=None, limit=50, + only_highlight=False): def f(txn): before_clause = "" if before: - before_clause = "AND stream_ordering < ?" + before_clause = "AND epa.stream_ordering < ?" args = [user_id, before, limit] else: args = [user_id, limit] + + if only_highlight: + if len(before_clause) > 0: + before_clause += " " + before_clause += "AND epa.highlight = 1" + + # NB. This assumes event_ids are globally unique since + # it makes the query easier to index sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," " epa.actions, epa.profile_tag, e.received_ts" " FROM event_push_actions epa, events e" - " WHERE epa.room_id = e.room_id AND epa.event_id = e.event_id" + " WHERE epa.event_id = e.event_id" " AND epa.user_id = ? %s" " ORDER BY epa.stream_ordering DESC" " LIMIT ?" diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1a7d4c5199..6dc46fa50f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -189,6 +189,14 @@ class EventsStore(SQLBaseStore): self._background_reindex_fields_sender, ) + self.register_background_index_update( + "event_contains_url_index", + index_name="event_contains_url_index", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering"], + where_clause="contains_url = true AND outlier = false", + ) + self._event_persist_queue = _EventPeristenceQueue() def persist_events(self, events_and_contexts, backfilled=False): @@ -497,7 +505,11 @@ class EventsStore(SQLBaseStore): # insert into the state_group, state_groups_state and # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, ((event, context),)) + try: + self._store_mult_state_groups_txn(txn, ((event, context),)) + except Exception: + logger.exception("") + raise metadata_json = encode_json( event.internal_metadata.get_dict() @@ -1543,6 +1555,9 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + for event_id, state_key in event_rows: + txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( @@ -1582,7 +1597,66 @@ class EventsStore(SQLBaseStore): " GROUP BY state_group HAVING MAX(topological_ordering) < ?", (room_id, topological_ordering, topological_ordering) ) + state_rows = txn.fetchall() + state_groups_to_delete = [sg for sg, in state_rows] + + # Now we get all the state groups that rely on these state groups + new_state_edges = [] + chunks = [ + state_groups_to_delete[i:i + 100] + for i in xrange(0, len(state_groups_to_delete), 100) + ] + for chunk in chunks: + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=chunk, + retcols=["state_group"], + keyvalues={}, + ) + new_state_edges.extend(row["state_group"] for row in rows) + + # Now we turn the state groups that reference to-be-deleted state groups + # to non delta versions. + for new_state_edge in new_state_edges: + curr_state = self._get_state_groups_from_groups_txn( + txn, [new_state_edge], types=None + ) + curr_state = curr_state[new_state_edge] + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": new_state_edge, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in curr_state.items() + ], + ) + txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", state_rows diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b94ce7bea1..7efbe51cda 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 34 +SCHEMA_VERSION = 35 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -242,7 +242,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module = imp.load_source( module_name, absolute_path, python_file ) - logger.debug("Running script %s", relative_path) + logger.info("Running script %s", relative_path) module.run_create(cur, database_engine) if not is_empty: module.run_upgrade(cur, database_engine, config=config) @@ -253,7 +253,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, pass elif ext == ".sql": # A plain old .sql file, just read and execute it - logger.debug("Applying schema %s", relative_path) + logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) else: # Not a valid delta file. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8251f58670..2ef13d7403 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -48,15 +48,31 @@ class RoomStore(SQLBaseStore): StoreError if the room could not be stored. """ try: - yield self._simple_insert( - "rooms", - { - "room_id": room_id, - "creator": room_creator_user_id, - "is_public": is_public, - }, - desc="store_room", - ) + def store_room_txn(txn, next_id): + self._simple_insert_txn( + txn, + "rooms", + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + }, + ) + if is_public: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + } + ) + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "store_room_txn", + store_room_txn, next_id, + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -77,13 +93,45 @@ class RoomStore(SQLBaseStore): allow_none=True, ) + @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): - return self._simple_update_one( - table="rooms", - keyvalues={"room_id": room_id}, - updatevalues={"is_public": is_public}, - desc="set_room_is_public", - ) + def set_room_is_public_txn(txn, next_id): + self._simple_update_one_txn( + txn, + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"is_public": is_public}, + ) + + entries = self._simple_select_list_txn( + txn, + table="public_room_list_stream", + keyvalues={"room_id": room_id}, + retcols=("stream_id", "visibility"), + ) + + entries.sort(key=lambda r: r["stream_id"]) + + add_to_stream = True + if entries: + add_to_stream = bool(entries[-1]["visibility"]) != is_public + + if add_to_stream: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + } + ) + + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "set_room_is_public", + set_room_is_public_txn, next_id, + ) def get_public_room_ids(self): return self._simple_select_onecol( @@ -207,3 +255,71 @@ class RoomStore(SQLBaseStore): }, desc="add_event_report" ) + + def get_current_public_room_stream_id(self): + return self._public_room_id_gen.get_current_token() + + def get_public_room_ids_at_stream_id(self, stream_id): + return self.runInteraction( + "get_public_room_ids_at_stream_id", + self.get_public_room_ids_at_stream_id_txn, stream_id + ) + + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id): + return { + rm + for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items() + if vis + } + + def get_published_at_stream_id_txn(self, txn, stream_id): + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + txn.execute(sql, (stream_id,)) + return dict(txn.fetchall()) + + def get_public_room_changes(self, prev_stream_id, new_stream_id): + def get_public_room_changes_txn(txn): + then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id) + + now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id) + + now_rooms_visible = set( + rm for rm, vis in now_rooms_dict.items() if vis + ) + now_rooms_not_visible = set( + rm for rm, vis in now_rooms_dict.items() if not vis + ) + + newly_visible = now_rooms_visible - then_rooms + newly_unpublished = now_rooms_not_visible & then_rooms + + return newly_visible, newly_unpublished + + return self.runInteraction( + "get_public_room_changes", get_public_room_changes_txn + ) + + def get_all_new_public_rooms(self, prev_id, current_id, limit): + def get_all_new_public_rooms(txn): + sql = (""" + SELECT stream_id, room_id, visibility FROM public_room_list_stream + WHERE stream_id > ? AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """) + + txn.execute(sql, (prev_id, current_id, limit,)) + return txn.fetchall() + + return self.runInteraction( + "get_all_new_public_rooms", get_all_new_public_rooms + ) diff --git a/synapse/storage/schema/delta/22/receipts_index.sql b/synapse/storage/schema/delta/22/receipts_index.sql index 7bc061dff6..bfc0b3bcaa 100644 --- a/synapse/storage/schema/delta/22/receipts_index.sql +++ b/synapse/storage/schema/delta/22/receipts_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id ); diff --git a/synapse/storage/schema/delta/28/events_room_stream.sql b/synapse/storage/schema/delta/28/events_room_stream.sql index 200c35e6e2..36609475f1 100644 --- a/synapse/storage/schema/delta/28/events_room_stream.sql +++ b/synapse/storage/schema/delta/28/events_room_stream.sql @@ -13,4 +13,8 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX events_room_stream on events(room_id, stream_ordering); diff --git a/synapse/storage/schema/delta/28/public_roms_index.sql b/synapse/storage/schema/delta/28/public_roms_index.sql index ba62a974a4..6c1fd68c5b 100644 --- a/synapse/storage/schema/delta/28/public_roms_index.sql +++ b/synapse/storage/schema/delta/28/public_roms_index.sql @@ -13,4 +13,8 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX public_room_index on rooms(is_public); diff --git a/synapse/storage/schema/delta/28/receipts_user_id_index.sql b/synapse/storage/schema/delta/28/receipts_user_id_index.sql index 452a1b3c6c..cb84c69baa 100644 --- a/synapse/storage/schema/delta/28/receipts_user_id_index.sql +++ b/synapse/storage/schema/delta/28/receipts_user_id_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); diff --git a/synapse/storage/schema/delta/29/push_actions.sql b/synapse/storage/schema/delta/29/push_actions.sql index 7e7b09820a..84b21cf813 100644 --- a/synapse/storage/schema/delta/29/push_actions.sql +++ b/synapse/storage/schema/delta/29/push_actions.sql @@ -26,6 +26,10 @@ UPDATE event_push_actions SET stream_ordering = ( UPDATE event_push_actions SET notif = 1, highlight = 0; +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering ); diff --git a/synapse/storage/schema/delta/31/pushers_index.sql b/synapse/storage/schema/delta/31/pushers_index.sql index 9027bccc69..a82add88fd 100644 --- a/synapse/storage/schema/delta/31/pushers_index.sql +++ b/synapse/storage/schema/delta/31/pushers_index.sql @@ -13,6 +13,10 @@ * limitations under the License. */ +/** Using CREATE INDEX directly is deprecated in favour of using background + * update see synapse/storage/schema/delta/33/access_tokens_device_index.sql + * and synapse/storage/registration.py for an example using + * "access_tokens_device_index" **/ CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id ); diff --git a/synapse/storage/schema/delta/35/add_state_index.sql b/synapse/storage/schema/delta/35/add_state_index.sql new file mode 100644 index 0000000000..0fce26345b --- /dev/null +++ b/synapse/storage/schema/delta/35/add_state_index.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +ALTER TABLE background_updates ADD COLUMN depends_on TEXT; + +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ('state_group_state_type_index', '{}', 'state_group_state_deduplication'); diff --git a/synapse/storage/schema/delta/35/contains_url.sql b/synapse/storage/schema/delta/35/contains_url.sql new file mode 100644 index 0000000000..6cd123027b --- /dev/null +++ b/synapse/storage/schema/delta/35/contains_url.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT into background_updates (update_name, progress_json) + VALUES ('event_contains_url_index', '{}'); diff --git a/synapse/storage/schema/delta/35/device_outbox.sql b/synapse/storage/schema/delta/35/device_outbox.sql new file mode 100644 index 0000000000..17e6c43105 --- /dev/null +++ b/synapse/storage/schema/delta/35/device_outbox.sql @@ -0,0 +1,39 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP TABLE IF EXISTS device_federation_outbox; +CREATE TABLE device_federation_outbox ( + destination TEXT NOT NULL, + stream_id BIGINT NOT NULL, + queued_ts BIGINT NOT NULL, + messages_json TEXT NOT NULL +); + + +DROP INDEX IF EXISTS device_federation_outbox_destination_id; +CREATE INDEX device_federation_outbox_destination_id + ON device_federation_outbox(destination, stream_id); + + +DROP TABLE IF EXISTS device_federation_inbox; +CREATE TABLE device_federation_inbox ( + origin TEXT NOT NULL, + message_id TEXT NOT NULL, + received_ts BIGINT NOT NULL +); + +DROP INDEX IF EXISTS device_federation_inbox_sender_id; +CREATE INDEX device_federation_inbox_sender_id + ON device_federation_inbox(origin, message_id); diff --git a/synapse/storage/schema/delta/35/device_stream_id.sql b/synapse/storage/schema/delta/35/device_stream_id.sql new file mode 100644 index 0000000000..7ab7d942e2 --- /dev/null +++ b/synapse/storage/schema/delta/35/device_stream_id.sql @@ -0,0 +1,21 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE device_max_stream_id ( + stream_id BIGINT NOT NULL +); + +INSERT INTO device_max_stream_id (stream_id) + SELECT COALESCE(MAX(stream_id), 0) FROM device_inbox; diff --git a/synapse/storage/schema/delta/35/event_push_actions_index.sql b/synapse/storage/schema/delta/35/event_push_actions_index.sql new file mode 100644 index 0000000000..2e836d8e9c --- /dev/null +++ b/synapse/storage/schema/delta/35/event_push_actions_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT into background_updates (update_name, progress_json) + VALUES ('epa_highlight_index', '{}'); diff --git a/synapse/storage/schema/delta/35/public_room_list_change_stream.sql b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql new file mode 100644 index 0000000000..dd2bf2e28a --- /dev/null +++ b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql @@ -0,0 +1,33 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE public_room_list_stream ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + visibility BOOLEAN NOT NULL +); + +INSERT INTO public_room_list_stream (stream_id, room_id, visibility) + SELECT 1, room_id, is_public FROM rooms + WHERE is_public = CAST(1 AS BOOLEAN); + +CREATE INDEX public_room_list_stream_idx on public_room_list_stream( + stream_id +); + +CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream( + room_id, stream_id +); diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql new file mode 100644 index 0000000000..0f1fa68a89 --- /dev/null +++ b/synapse/storage/schema/delta/35/state.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE state_group_edges( + state_group BIGINT NOT NULL, + prev_state_group BIGINT NOT NULL +); + +CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); +CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group); diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql new file mode 100644 index 0000000000..97e5067ef4 --- /dev/null +++ b/synapse/storage/schema/delta/35/state_dedupe.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('state_group_state_deduplication', '{}'); diff --git a/synapse/storage/schema/delta/35/stream_order_to_extrem.sql b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql new file mode 100644 index 0000000000..2b945d8a57 --- /dev/null +++ b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql @@ -0,0 +1,37 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE stream_ordering_to_exterm ( + stream_ordering BIGINT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL +); + +INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id) + SELECT stream_ordering, room_id, event_id FROM event_forward_extremities + INNER JOIN ( + SELECT room_id, max(stream_ordering) as stream_ordering FROM events + INNER JOIN event_forward_extremities USING (room_id, event_id) + GROUP BY room_id + ) AS rms USING (room_id); + +CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( + stream_ordering +); + +CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( + room_id, stream_ordering +); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ec551b0b4f..7eb342674c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string +from synapse.storage.engines import PostgresEngine from twisted.internet import defer @@ -24,6 +25,9 @@ import logging logger = logging.getLogger(__name__) +MAX_STATE_DELTA_HOPS = 100 + + class StateStore(SQLBaseStore): """ Keeps track of the state at a given event. @@ -43,6 +47,20 @@ class StateStore(SQLBaseStore): * `state_groups_state`: Maps state group to state events. """ + STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" + + def __init__(self, hs): + super(StateStore, self).__init__(hs) + self.register_background_update_handler( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, + self._background_deduplicate_state, + ) + self.register_background_update_handler( + self.STATE_GROUP_INDEX_UPDATE_NAME, + self._background_index_state, + ) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: @@ -103,11 +121,8 @@ class StateStore(SQLBaseStore): state_groups[event.event_id] = context.state_group if self._have_persisted_state_group_txn(txn, context.state_group): - logger.info("Already persisted state_group: %r", context.state_group) continue - state_event_ids = dict(context.current_state_ids) - self._simple_insert_txn( txn, table="state_groups", @@ -118,20 +133,51 @@ class StateStore(SQLBaseStore): }, ) - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { + # We persist as a delta if we can, while also ensuring the chain + # of deltas isn't tooo long, as otherwise read performance degrades. + if context.prev_group: + potential_hops = self._count_state_group_hops_txn( + txn, context.prev_group + ) + if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in state_event_ids.items() - ], - ) + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.current_state_ids.items() + ], + ) self._simple_insert_many_txn( txn, @@ -145,6 +191,47 @@ class StateStore(SQLBaseStore): ], ) + def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """) + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): if event_type and state_key is not None: @@ -206,48 +293,124 @@ class StateStore(SQLBaseStore): def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ - def f(txn, groups): - if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) - else: - where_clause = "" - - sql = ( - "SELECT state_group, event_id, type, state_key" - " FROM state_groups_state WHERE" - " state_group IN (%s) %s" % ( - ",".join("?" for _ in groups), - where_clause, - ) - ) - - args = list(groups) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - - results = {group: {} for group in groups} - for row in rows: - key = (row["type"], row["state_key"]) - results[row["state_group"]][key] = row["event_id"] - return results - results = {} chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - f, chunk + self._get_state_groups_from_groups_txn, chunk, types, ) results.update(res) defer.returnValue(results) + def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + results = {group: {} for group in groups} + if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. This is + # a temporary hack until we can add the right indices in + txn.execute("SET LOCAL enable_seqscan=off") + + # The below query walks the state_group tree so that the "state" + # table includes all state_groups in the tree. It then joins + # against `state_groups_state` to fetch the latest state. + # It assumes that previous state groups are always numerically + # lesser. + # The PARTITION is used to get the event_id in the greatest state + # group for the given type, state_key. + # This may return multiple rows per (type, state_key), but last_value + # should be the same. + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT type, state_key, last_value(event_id) OVER ( + PARTITION BY type, state_key ORDER BY state_group ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS event_id FROM state_groups_state + WHERE state_group IN ( + SELECT state_group FROM state + ) + %s + """) + + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) + for etype, state_key in types + ] + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] + + for where_clause, where_args in clause_to_args: + for group in groups: + args = [group] + args.extend(where_args) + + txn.execute(sql % (where_clause,), args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + else: + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + for group in groups: + group_tree = [group] + next_group = group + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + group_tree.append(next_group) + + sql = (""" + SELECT type, state_key, event_id FROM state_groups_state + INNER JOIN ( + SELECT type, state_key, max(state_group) as state_group + FROM state_groups_state + WHERE state_group IN (%s) %s + GROUP BY type, state_key + ) USING (type, state_key, state_group); + """) % (",".join("?" for _ in group_tree), where_clause,) + + args = list(group_tree) + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + + return results + @defer.inlineCallbacks def get_state_for_events(self, event_ids, types): """Given a list of event_ids and type tuples, return a list of state @@ -504,32 +667,184 @@ class StateStore(SQLBaseStore): defer.returnValue(results) - def get_all_new_state_groups(self, last_id, current_id, limit): - def get_all_new_state_groups_txn(txn): - sql = ( - "SELECT id, room_id, event_id FROM state_groups" - " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?" + def get_next_state_group(self): + return self._state_groups_id_gen.get_next() + + @defer.inlineCallbacks + def _background_deduplicate_state(self, progress, batch_size): + """This background update will slowly deduplicate state by reencoding + them as deltas. + """ + last_state_group = progress.get("last_state_group", 0) + rows_inserted = progress.get("rows_inserted", 0) + max_group = progress.get("max_group", None) + + BATCH_SIZE_SCALE_FACTOR = 100 + + batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR)) + + if max_group is None: + rows = yield self._execute( + "_background_deduplicate_state", None, + "SELECT coalesce(max(id), 0) FROM state_groups", ) - txn.execute(sql, (last_id, current_id, limit)) - groups = txn.fetchall() + max_group = rows[0][0] + + def reindex_txn(txn): + new_last_state_group = last_state_group + for count in xrange(batch_size): + txn.execute( + "SELECT id, room_id FROM state_groups" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC" + " LIMIT 1", + (new_last_state_group, max_group,) + ) + row = txn.fetchone() + if row: + state_group, room_id = row - if not groups: - return ([], []) + if not row or not state_group: + return True, count - lower_bound = groups[0][0] - upper_bound = groups[-1][0] - sql = ( - "SELECT state_group, type, state_key, event_id" - " FROM state_groups_state" - " WHERE ? <= state_group AND state_group <= ?" + txn.execute( + "SELECT state_group FROM state_group_edges" + " WHERE state_group = ?", + (state_group,) + ) + + # If we reach a point where we've already started inserting + # edges we should stop. + if txn.fetchall(): + return True, count + + txn.execute( + "SELECT coalesce(max(id), 0) FROM state_groups" + " WHERE id < ? AND room_id = ?", + (state_group, room_id,) + ) + prev_group, = txn.fetchone() + new_last_state_group = state_group + + if prev_group: + potential_hops = self._count_state_group_hops_txn( + txn, prev_group + ) + if potential_hops >= MAX_STATE_DELTA_HOPS: + # We want to ensure chains are at most this long,# + # otherwise read performance degrades. + continue + + prev_state = self._get_state_groups_from_groups_txn( + txn, [prev_group], types=None + ) + prev_state = prev_state[prev_group] + + curr_state = self._get_state_groups_from_groups_txn( + txn, [state_group], types=None + ) + curr_state = curr_state[state_group] + + if not set(prev_state.keys()) - set(curr_state.keys()): + # We can only do a delta if the current has a strict super set + # of keys + + delta_state = { + key: value for key, value in curr_state.items() + if prev_state.get(key, None) != value + } + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": state_group, + } + ) + + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": state_group, + "prev_state_group": prev_group, + } + ) + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": state_group, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": state_group, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in delta_state.items() + ], + ) + + progress = { + "last_state_group": state_group, + "rows_inserted": rows_inserted + batch_size, + "max_group": max_group, + } + + self._background_update_progress_txn( + txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress ) - txn.execute(sql, (lower_bound, upper_bound)) - state_group_state = txn.fetchall() - return (groups, state_group_state) - return self.runInteraction( - "get_all_new_state_groups", get_all_new_state_groups_txn + return False, batch_size + + finished, result = yield self.runInteraction( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn ) - def get_next_state_group(self): - return self._state_groups_id_gen.get_next() + if finished: + yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) + + defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR) + + @defer.inlineCallbacks + def _background_index_state(self, progress, batch_size): + def reindex_txn(conn): + conn.rollback() + if isinstance(self.database_engine, PostgresEngine): + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + try: + txn = conn.cursor() + txn.execute( + "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + finally: + conn.set_session(autocommit=False) + else: + txn = conn.cursor() + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + + yield self.runWithConnection(reindex_txn) + + yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) + + defer.returnValue(1) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0577a0525b..07ea969d4d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -531,6 +531,9 @@ class StreamStore(SQLBaseStore): ) defer.returnValue("t%d-%d" % (topo, token)) + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + def get_stream_token_for_event(self, event_id): """The stream token for an event Args: |