diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/end_to_end_keys.py | 60 | ||||
-rw-r--r-- | synapse/storage/events.py | 116 | ||||
-rw-r--r-- | synapse/storage/schema/delta/33/devices_for_e2e_keys.sql | 2 | ||||
-rw-r--r-- | synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql | 20 |
4 files changed, 156 insertions, 42 deletions
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 62b7790e91..385d607056 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import collections import twisted.internet.defer @@ -38,24 +39,49 @@ class EndToEndKeyStore(SQLBaseStore): query_list(list): List of pairs of user_ids and device_ids. Returns: Dict mapping from user-id to dict mapping from device_id to - key json byte strings. + dict containing "key_json", "device_display_name". """ - def _get_e2e_device_keys(txn): - result = {} - for user_id, device_id in query_list: - user_result = result.setdefault(user_id, {}) - keyvalues = {"user_id": user_id} - if device_id: - keyvalues["device_id"] = device_id - rows = self._simple_select_list_txn( - txn, table="e2e_device_keys_json", - keyvalues=keyvalues, - retcols=["device_id", "key_json"] - ) - for row in rows: - user_result[row["device_id"]] = row["key_json"] - return result - return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys) + if not query_list: + return {} + + return self.runInteraction( + "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list + ) + + def _get_e2e_device_keys_txn(self, txn, query_list): + query_clauses = [] + query_params = [] + + for (user_id, device_id) in query_list: + query_clause = "k.user_id = ?" + query_params.append(user_id) + + if device_id: + query_clause += " AND k.device_id = ?" + query_params.append(device_id) + + query_clauses.append(query_clause) + + sql = ( + "SELECT k.user_id, k.device_id, " + " d.display_name AS device_display_name, " + " k.key_json" + " FROM e2e_device_keys_json k" + " LEFT JOIN devices d ON d.user_id = k.user_id" + " AND d.device_id = k.device_id" + " WHERE %s" + ) % ( + " OR ".join("(" + q + ")" for q in query_clauses) + ) + + txn.execute(sql, query_params) + rows = self.cursor_to_dict(txn) + + result = collections.defaultdict(dict) + for row in rows: + result[row["user_id"]][row["device_id"]] = row + + return result def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): def _add_e2e_one_time_keys(txn): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c63ca36df6..e4dbaa3547 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -26,7 +26,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from canonicaljson import encode_canonical_json -from collections import deque, namedtuple +from collections import deque, namedtuple, OrderedDict +from functools import wraps import synapse import synapse.metrics @@ -150,6 +151,26 @@ class _EventPeristenceQueue(object): _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) +def _retry_on_integrity_error(func): + """Wraps a database function so that it gets retried on IntegrityError, + with `delete_existing=True` passed in. + + Args: + func: function that returns a Deferred and accepts a `delete_existing` arg + """ + @wraps(func) + @defer.inlineCallbacks + def f(self, *args, **kwargs): + try: + res = yield func(self, *args, **kwargs) + except self.database_engine.module.IntegrityError: + logger.exception("IntegrityError, retrying.") + res = yield func(self, *args, delete_existing=True, **kwargs) + defer.returnValue(res) + + return f + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -229,8 +250,10 @@ class EventsStore(SQLBaseStore): self._event_persist_queue.handle_queue(room_id, persisting_queue) + @_retry_on_integrity_error @defer.inlineCallbacks - def _persist_events(self, events_and_contexts, backfilled=False): + def _persist_events(self, events_and_contexts, backfilled=False, + delete_existing=False): if not events_and_contexts: return @@ -273,12 +296,15 @@ class EventsStore(SQLBaseStore): self._persist_events_txn, events_and_contexts=chunk, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc_by(len(chunk)) + @_retry_on_integrity_error @defer.inlineCallbacks @log_function - def _persist_event(self, event, context, current_state=None, backfilled=False): + def _persist_event(self, event, context, current_state=None, backfilled=False, + delete_existing=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -291,6 +317,7 @@ class EventsStore(SQLBaseStore): context=context, current_state=current_state, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc() except _RollbackButIsFineException: @@ -353,7 +380,8 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @log_function - def _persist_event_txn(self, txn, event, context, current_state, backfilled=False): + def _persist_event_txn(self, txn, event, context, current_state, backfilled=False, + delete_existing=False): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: @@ -393,16 +421,38 @@ class EventsStore(SQLBaseStore): txn, [(event, context)], backfilled=backfilled, + delete_existing=delete_existing, ) @log_function - def _persist_events_txn(self, txn, events_and_contexts, backfilled): + def _persist_events_txn(self, txn, events_and_contexts, backfilled, + delete_existing=False): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, and the rejections table. Things reading from those table will need to check whether the event was rejected. + + If delete_existing is True then existing events will be purged from the + database before insertion. This is useful when retrying due to IntegrityError. """ + # Ensure that we don't have the same event twice. + # Pick the earliest non-outlier if there is one, else the earliest one. + new_events_and_contexts = OrderedDict() + for event, context in events_and_contexts: + prev_event_context = new_events_and_contexts.get(event.event_id) + if prev_event_context: + if not event.internal_metadata.is_outlier(): + if prev_event_context[0].internal_metadata.is_outlier(): + # To ensure correct ordering we pop, as OrderedDict is + # ordered by first insertion. + new_events_and_contexts.pop(event.event_id, None) + new_events_and_contexts[event.event_id] = (event, context) + else: + new_events_and_contexts[event.event_id] = (event, context) + + events_and_contexts = new_events_and_contexts.values() + depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -433,8 +483,6 @@ class EventsStore(SQLBaseStore): for event_id, outlier in txn.fetchall() } - # Remove the events that we've seen before. - event_map = {} to_remove = set() for event, context in events_and_contexts: if context.rejected: @@ -445,23 +493,6 @@ class EventsStore(SQLBaseStore): to_remove.add(event) continue - # Handle the case of the list including the same event multiple - # times. The tricky thing here is when they differ by whether - # they are an outlier. - if event.event_id in event_map: - other = event_map[event.event_id] - - if not other.internal_metadata.is_outlier(): - to_remove.add(event) - continue - elif not event.internal_metadata.is_outlier(): - to_remove.add(event) - continue - else: - to_remove.add(other) - - event_map[event.event_id] = event - if event.event_id not in have_persisted: continue @@ -539,6 +570,43 @@ class EventsStore(SQLBaseStore): ] } + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + + logger.info("Deleting existing") + + for table in ( + "events", + "event_auth", + "event_json", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_push_actions", + "event_reference_hashes", + "event_search", + "event_signatures", + "event_to_state_groups", + "guest_access", + "history_visibility", + "local_invites", + "room_names", + "state_events", + "rejections", + "redactions", + "room_memberships", + "state_events" + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + self._simple_insert_many_txn( txn, table="event_json", diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql index 140f2b63e0..aa4a3b9f2f 100644 --- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -16,4 +16,4 @@ -- make sure that we have a device record for each set of E2E keys, so that the -- user can delete them if they like. INSERT INTO devices - SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json; + SELECT user_id, device_id, NULL FROM e2e_device_keys_json; diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql new file mode 100644 index 0000000000..6671573398 --- /dev/null +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- a previous version of the "devices_for_e2e_keys" delta set all the device +-- names to "unknown device". This wasn't terribly helpful +UPDATE devices + SET display_name = NULL + WHERE display_name = 'unknown device'; |