summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-03 11:23:39 +0100
committerErik Johnston <erik@matrix.org>2016-08-03 11:23:39 +0100
commita8a32d2714705d679584c5e10dfa79d9b4a8a76f (patch)
tree2e7c89970cca2628737dd6173feb660feb7b510b /synapse/storage
parentMerge pull request #971 from matrix-org/erikj/fed_state (diff)
downloadsynapse-a8a32d2714705d679584c5e10dfa79d9b4a8a76f.tar.xz
Ensure we only persist an event once at a time
Diffstat (limited to '')
-rw-r--r--synapse/storage/events.py19
1 files changed, 18 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c63ca36df6..670aa8f118 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -26,7 +26,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
 
 import synapse
 import synapse.metrics
@@ -403,6 +403,23 @@ class EventsStore(SQLBaseStore):
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
         """
+        # Ensure that we don't have the same event twice.
+        # Pick the earliest non-outlier if there is one, else the earliest one.
+        new_events_and_contexts = OrderedDict()
+        for event, context in events_and_contexts:
+            prev_event_context = new_events_and_contexts.get(event.event_id)
+            if prev_event_context:
+                if not event.internal_metadata.is_outlier():
+                    if prev_event_context[0].internal_metadata.is_outlier():
+                        # To ensure correct ordering we pop, as OrderedDict is
+                        # ordered by first insertion.
+                        new_events_and_contexts.pop(event.event_id, None)
+                        new_events_and_contexts[event.event_id] = (event, context)
+            else:
+                new_events_and_contexts[event.event_id] = (event, context)
+
+        events_and_contexts = new_events_and_contexts.values()
+
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids