diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 62b7790e91..385d607056 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
import twisted.internet.defer
@@ -38,24 +39,49 @@ class EndToEndKeyStore(SQLBaseStore):
query_list(list): List of pairs of user_ids and device_ids.
Returns:
Dict mapping from user-id to dict mapping from device_id to
- key json byte strings.
+ dict containing "key_json", "device_display_name".
"""
- def _get_e2e_device_keys(txn):
- result = {}
- for user_id, device_id in query_list:
- user_result = result.setdefault(user_id, {})
- keyvalues = {"user_id": user_id}
- if device_id:
- keyvalues["device_id"] = device_id
- rows = self._simple_select_list_txn(
- txn, table="e2e_device_keys_json",
- keyvalues=keyvalues,
- retcols=["device_id", "key_json"]
- )
- for row in rows:
- user_result[row["device_id"]] = row["key_json"]
- return result
- return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys)
+ if not query_list:
+ return {}
+
+ return self.runInteraction(
+ "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list
+ )
+
+ def _get_e2e_device_keys_txn(self, txn, query_list):
+ query_clauses = []
+ query_params = []
+
+ for (user_id, device_id) in query_list:
+ query_clause = "k.user_id = ?"
+ query_params.append(user_id)
+
+ if device_id:
+ query_clause += " AND k.device_id = ?"
+ query_params.append(device_id)
+
+ query_clauses.append(query_clause)
+
+ sql = (
+ "SELECT k.user_id, k.device_id, "
+ " d.display_name AS device_display_name, "
+ " k.key_json"
+ " FROM e2e_device_keys_json k"
+ " LEFT JOIN devices d ON d.user_id = k.user_id"
+ " AND d.device_id = k.device_id"
+ " WHERE %s"
+ ) % (
+ " OR ".join("(" + q + ")" for q in query_clauses)
+ )
+
+ txn.execute(sql, query_params)
+ rows = self.cursor_to_dict(txn)
+
+ result = collections.defaultdict(dict)
+ for row in rows:
+ result[row["user_id"]][row["device_id"]] = row
+
+ return result
def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list):
def _add_e2e_one_time_keys(txn):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c63ca36df6..e4dbaa3547 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -26,7 +26,8 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
+from functools import wraps
import synapse
import synapse.metrics
@@ -150,6 +151,26 @@ class _EventPeristenceQueue(object):
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+def _retry_on_integrity_error(func):
+ """Wraps a database function so that it gets retried on IntegrityError,
+ with `delete_existing=True` passed in.
+
+ Args:
+ func: function that returns a Deferred and accepts a `delete_existing` arg
+ """
+ @wraps(func)
+ @defer.inlineCallbacks
+ def f(self, *args, **kwargs):
+ try:
+ res = yield func(self, *args, **kwargs)
+ except self.database_engine.module.IntegrityError:
+ logger.exception("IntegrityError, retrying.")
+ res = yield func(self, *args, delete_existing=True, **kwargs)
+ defer.returnValue(res)
+
+ return f
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -229,8 +250,10 @@ class EventsStore(SQLBaseStore):
self._event_persist_queue.handle_queue(room_id, persisting_queue)
+ @_retry_on_integrity_error
@defer.inlineCallbacks
- def _persist_events(self, events_and_contexts, backfilled=False):
+ def _persist_events(self, events_and_contexts, backfilled=False,
+ delete_existing=False):
if not events_and_contexts:
return
@@ -273,12 +296,15 @@ class EventsStore(SQLBaseStore):
self._persist_events_txn,
events_and_contexts=chunk,
backfilled=backfilled,
+ delete_existing=delete_existing,
)
persist_event_counter.inc_by(len(chunk))
+ @_retry_on_integrity_error
@defer.inlineCallbacks
@log_function
- def _persist_event(self, event, context, current_state=None, backfilled=False):
+ def _persist_event(self, event, context, current_state=None, backfilled=False,
+ delete_existing=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
with self._state_groups_id_gen.get_next() as state_group_id:
@@ -291,6 +317,7 @@ class EventsStore(SQLBaseStore):
context=context,
current_state=current_state,
backfilled=backfilled,
+ delete_existing=delete_existing,
)
persist_event_counter.inc()
except _RollbackButIsFineException:
@@ -353,7 +380,8 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
- def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
+ def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
+ delete_existing=False):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
@@ -393,16 +421,38 @@ class EventsStore(SQLBaseStore):
txn,
[(event, context)],
backfilled=backfilled,
+ delete_existing=delete_existing,
)
@log_function
- def _persist_events_txn(self, txn, events_and_contexts, backfilled):
+ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+ delete_existing=False):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
+
+ If delete_existing is True then existing events will be purged from the
+ database before insertion. This is useful when retrying due to IntegrityError.
"""
+ # Ensure that we don't have the same event twice.
+ # Pick the earliest non-outlier if there is one, else the earliest one.
+ new_events_and_contexts = OrderedDict()
+ for event, context in events_and_contexts:
+ prev_event_context = new_events_and_contexts.get(event.event_id)
+ if prev_event_context:
+ if not event.internal_metadata.is_outlier():
+ if prev_event_context[0].internal_metadata.is_outlier():
+ # To ensure correct ordering we pop, as OrderedDict is
+ # ordered by first insertion.
+ new_events_and_contexts.pop(event.event_id, None)
+ new_events_and_contexts[event.event_id] = (event, context)
+ else:
+ new_events_and_contexts[event.event_id] = (event, context)
+
+ events_and_contexts = new_events_and_contexts.values()
+
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -433,8 +483,6 @@ class EventsStore(SQLBaseStore):
for event_id, outlier in txn.fetchall()
}
- # Remove the events that we've seen before.
- event_map = {}
to_remove = set()
for event, context in events_and_contexts:
if context.rejected:
@@ -445,23 +493,6 @@ class EventsStore(SQLBaseStore):
to_remove.add(event)
continue
- # Handle the case of the list including the same event multiple
- # times. The tricky thing here is when they differ by whether
- # they are an outlier.
- if event.event_id in event_map:
- other = event_map[event.event_id]
-
- if not other.internal_metadata.is_outlier():
- to_remove.add(event)
- continue
- elif not event.internal_metadata.is_outlier():
- to_remove.add(event)
- continue
- else:
- to_remove.add(other)
-
- event_map[event.event_id] = event
-
if event.event_id not in have_persisted:
continue
@@ -539,6 +570,43 @@ class EventsStore(SQLBaseStore):
]
}
+ if delete_existing:
+ # For paranoia reasons, we go and delete all the existing entries
+ # for these events so we can reinsert them.
+ # This gets around any problems with some tables already having
+ # entries.
+
+ logger.info("Deleting existing")
+
+ for table in (
+ "events",
+ "event_auth",
+ "event_json",
+ "event_content_hashes",
+ "event_destinations",
+ "event_edge_hashes",
+ "event_edges",
+ "event_forward_extremities",
+ "event_push_actions",
+ "event_reference_hashes",
+ "event_search",
+ "event_signatures",
+ "event_to_state_groups",
+ "guest_access",
+ "history_visibility",
+ "local_invites",
+ "room_names",
+ "state_events",
+ "rejections",
+ "redactions",
+ "room_memberships",
+ "state_events"
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
+ [(ev.event_id,) for ev, _ in events_and_contexts]
+ )
+
self._simple_insert_many_txn(
txn,
table="event_json",
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
index 140f2b63e0..aa4a3b9f2f 100644
--- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
@@ -16,4 +16,4 @@
-- make sure that we have a device record for each set of E2E keys, so that the
-- user can delete them if they like.
INSERT INTO devices
- SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json;
+ SELECT user_id, device_id, NULL FROM e2e_device_keys_json;
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
new file mode 100644
index 0000000000..6671573398
--- /dev/null
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a previous version of the "devices_for_e2e_keys" delta set all the device
+-- names to "unknown device". This wasn't terribly helpful
+UPDATE devices
+ SET display_name = NULL
+ WHERE display_name = 'unknown device';
|