summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-03 10:38:14 +0000
committerErik Johnston <erik@matrix.org>2015-02-03 10:39:41 +0000
commite7ca813dd476c83497d4130ad8efa9424d86e921 (patch)
treece25a39f615e72e05230d3cca398a467ac8bb06b
parentDon't bother requesting PDUs with bad signatures from the same server (diff)
downloadsynapse-e7ca813dd476c83497d4130ad8efa9424d86e921.tar.xz
Try to ensure we don't persist an event we have already persisted. In persist_event check if we already have the event, if so then update instead of replacing so that we don't cause a bump of the stream_ordering.
-rw-r--r--synapse/handlers/federation.py42
-rw-r--r--synapse/storage/__init__.py40
-rw-r--r--tests/handlers/test_federation.py5
3 files changed, 68 insertions, 19 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8bf5a4cc11..c384789c2f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -112,6 +112,14 @@ class FederationHandler(BaseHandler):
 
         logger.debug("Event: %s", event)
 
+        event_ids = set()
+        if state:
+            event_ids += {e.event_id for e in state}
+        if auth_chain:
+            event_ids += {e.event_id for e in auth_chain}
+
+        seen_ids = (yield self.store.have_events(event_ids)).keys()
+
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
         current_state = None
@@ -124,20 +132,26 @@ class FederationHandler(BaseHandler):
             current_state = state
 
         if state and auth_chain is not None:
-            for e in state:
-                e.internal_metadata.outlier = True
-                try:
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
-                        (e.type, e.state_key): e for e in auth_chain
-                        if e.event_id in auth_ids
-                    }
-                    yield self._handle_new_event(origin, e, auth_events=auth)
-                except:
-                    logger.exception(
-                        "Failed to handle state event %s",
-                        e.event_id,
-                    )
+            for list_of_pdus in [auth_chain, state]:
+                for e in list_of_pdus:
+                    if e.event_id in seen_ids:
+                        continue
+
+                    e.internal_metadata.outlier = True
+                    try:
+                        auth_ids = [e_id for e_id, _ in e.auth_events]
+                        auth = {
+                            (e.type, e.state_key): e for e in auth_chain
+                            if e.event_id in auth_ids
+                        }
+                        yield self._handle_new_event(
+                            origin, e, auth_events=auth
+                        )
+                    except:
+                        logger.exception(
+                            "Failed to handle state event %s",
+                            e.event_id,
+                        )
 
         try:
             yield self._handle_new_event(
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b4a7a3f068..93aefe0c48 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -161,6 +161,39 @@ class DataStore(RoomMemberStore, RoomStore,
 
         outlier = event.internal_metadata.is_outlier()
 
+        have_persisted = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_json",
+            keyvalues={"event_id": event.event_id},
+            retcol="event_id",
+            allow_none=True,
+        )
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        if have_persisted:
+            if not outlier:
+                sql = (
+                    "UPDATE event_json SET internal_metadata = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (metadata_json.decode("UTF-8"), event.event_id,)
+                )
+
+                sql = (
+                    "UPDATE events SET outlier = 0"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (event.event_id,)
+                )
+            return
+
         event_dict = {
             k: v
             for k, v in event.get_dict().items()
@@ -170,10 +203,6 @@ class DataStore(RoomMemberStore, RoomStore,
             ]
         }
 
-        metadata_json = encode_canonical_json(
-            event.internal_metadata.get_dict()
-        )
-
         self._simple_insert_txn(
             txn,
             table="event_json",
@@ -482,6 +511,9 @@ class DataStore(RoomMemberStore, RoomStore,
             the rejected reason string if we rejected the event, else maps to
             None.
         """
+        if not event_ids:
+            return defer.succeed({})
+
         def f(txn):
             sql = (
                 "SELECT e.event_id, reason FROM events as e "
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 44dbce6bea..4270481139 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -91,7 +91,10 @@ class FederationTestCase(unittest.TestCase):
         self.datastore.persist_event.return_value = defer.succeed(None)
         self.datastore.get_room.return_value = defer.succeed(True)
         self.auth.check_host_in_room.return_value = defer.succeed(True)
-        self.datastore.have_events.return_value = defer.succeed({})
+
+        def have_events(event_ids):
+            return defer.succeed({})
+        self.datastore.have_events.side_effect = have_events
 
         def annotate(ev, old_state=None):
             context = Mock()