summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-27 17:30:11 +0000
committerBrendan Abolivier <babolivier@matrix.org>2019-02-13 15:16:03 +0000
commit2d8da62febe67a037af2bbe99fe6a6f9426475f4 (patch)
tree980849e943347ac33901eaded2f44579219195bf
parentMangle PDUs some more. Disable presence/typing/receipts. Don't die if we can'... (diff)
downloadsynapse-2d8da62febe67a037af2bbe99fe6a6f9426475f4.tar.xz
Only relay 'live' events
-rw-r--r--synapse/federation/federation_base.py20
-rw-r--r--synapse/federation/transaction_queue.py24
-rw-r--r--synapse/handlers/federation.py9
-rw-r--r--synapse/storage/events.py2
4 files changed, 43 insertions, 12 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 671ca1a7ec..ca3c283f45 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -79,16 +79,16 @@ class FederationBase(object):
                     allow_none=True,
                 )
 
-            if not res and pdu.origin != origin:
-                try:
-                    res = yield self.get_pdu(
-                        destinations=[pdu.origin],
-                        event_id=pdu.event_id,
-                        outlier=outlier,
-                        timeout=10000,
-                    )
-                except SynapseError:
-                    pass
+            # if not res and pdu.origin != origin:
+            #     try:
+            #         res = yield self.get_pdu(
+            #             destinations=[pdu.origin],
+            #             event_id=pdu.event_id,
+            #             outlier=outlier,
+            #             timeout=10000,
+            #         )
+            #     except SynapseError:
+            #         pass
 
             if not res:
                 logger.warn(
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f375034d5a..3854602bfd 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -177,7 +177,7 @@ class TransactionQueue(object):
 
                 @defer.inlineCallbacks
                 def handle_event(event):
-                    should_relay = yield self._should_relay(event)
+                    should_relay = yield self._should_relay(event, False)
                     logger.info("Should relay event %s: %s", event.event_id, should_relay)
                     if not should_relay:
                         return
@@ -250,6 +250,20 @@ class TransactionQueue(object):
             self._is_processing = False
 
     @defer.inlineCallbacks
+    def received_new_event(self, origin, event):
+        should_relay = yield self._should_relay(event, True)
+        logger.info("Should relay event %s: %s", event.event_id, should_relay)
+        if not should_relay:
+            return
+
+        destinations = event.unsigned.get("destinations")
+        destinations = set(destinations)
+
+        logger.debug("Sending %s to %r", event, destinations)
+
+        yield self._send_pdu(event, destinations)
+
+    @defer.inlineCallbacks
     def _send_pdu(self, pdu, destinations, span=None):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
@@ -352,12 +366,18 @@ class TransactionQueue(object):
 
         return joined_hosts
 
-    def _should_relay(self, event):
+    def _should_relay(self, event, from_federation):
         """Whether we should consider relaying this event.
         """
 
         # XXX: Hook for routing shenanigans
 
+<<<<<<< HEAD
+=======
+        if from_federation and event.unsigned.get("destinations"):
+            return True
+
+>>>>>>> efdec3252... Only relay 'live' events
         send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
         is_mine = self.is_mine_id(event.event_id)
         if not is_mine and send_on_behalf_of is None:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5053038d02..9350ff989d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -48,6 +48,7 @@ from synapse.crypto.event_signing import (
     add_hashes_and_signatures,
     compute_event_signature,
 )
+from synapse.events import FrozenEvent
 from synapse.events.validator import EventValidator
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
@@ -111,6 +112,7 @@ class FederationHandler(BaseHandler):
 
         self.store = hs.get_datastore()  # type: synapse.storage.DataStore
         self.federation_client = hs.get_federation_client()
+        self.federation_sender = hs.get_federation_sender()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -435,6 +437,10 @@ class FederationHandler(BaseHandler):
 
         logger.info("Thread ID %r", thread_id)
 
+        # Remove destinations field before persisting
+        event_copy = FrozenEvent.from_event(pdu)
+        pdu.unsigned.pop("destinations", None)
+
         yield self._process_received_pdu(
             origin,
             pdu,
@@ -443,6 +449,9 @@ class FederationHandler(BaseHandler):
             thread_id=thread_id,
         )
 
+        if sent_to_us_directly:
+            yield self.federation_sender.received_new_event(origin, event_copy)
+
         if new_thread:
             builder = self.event_builder_factory.new({
                 "type": "org.matrix.new_thread",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 187ca45ac9..118e33140e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -590,6 +590,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
                 prev_event_id IN (%s)
                 AND NOT events.outlier
                 AND rejections.event_id IS NULL
+                AND NOT events.internal_event
             """ % (
                 ",".join("?" for _ in batch),
             )
@@ -1290,6 +1291,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
                         and isinstance(event.content["url"], text_type)
                     ),
                     "thread_id": ctx.thread_id,
+                    "internal_event": event.internal_metadata.is_internal_event(),
                 }
                 for event, ctx in events_and_contexts
             ],