summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/5480.misc1
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/events/__init__.py12
-rw-r--r--synapse/federation/sender/__init__.py3
-rw-r--r--synapse/handlers/message.py75
-rw-r--r--synapse/storage/event_federation.py29
-rw-r--r--tests/storage/test_cleanup_extrems.py41
7 files changed, 166 insertions, 1 deletions
diff --git a/changelog.d/5480.misc b/changelog.d/5480.misc
new file mode 100644
index 0000000000..3001bcc1fe
--- /dev/null
+++ b/changelog.d/5480.misc
@@ -0,0 +1 @@
+Add an EXPERIMENTAL config option to try and periodically clean up extremities by sending dummy events.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 7d56e2d141..6e5b46e6c3 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -317,6 +317,12 @@ class ServerConfig(Config):
 
         _check_resource_config(self.listeners)
 
+        # An experimental option to try and periodically clean up extremities
+        # by sending dummy events.
+        self.cleanup_extremities_with_dummy_events = config.get(
+            "cleanup_extremities_with_dummy_events", False,
+        )
+
     def has_tls_listener(self):
         return any(l["tls"] for l in self.listeners)
 
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 1edd19cc13..7154bcbea6 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -92,6 +92,18 @@ class _EventInternalMetadata(object):
         """
         return getattr(self, "soft_failed", False)
 
+    def should_proactively_send(self):
+        """Whether the event, if ours, should be sent to other clients and
+        servers.
+
+        This is used for sending dummy events internally. Servers and clients
+        can still explicitly fetch the event.
+
+        Returns:
+            bool
+        """
+        return getattr(self, "proactively_send", True)
+
 
 def _event_dict_property(key):
     # We want to be able to use hasattr with the event dict properties.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4f0f939102..4224b29ecf 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -168,6 +168,9 @@ class FederationSender(object):
                     if not is_mine and send_on_behalf_of is None:
                         return
 
+                    if not event.internal_metadata.should_proactively_send():
+                        return
+
                     try:
                         # Get the state from before the event.
                         # We need to make sure that this is the state from before
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 11650dc80c..7728ea230d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,9 +34,10 @@ from synapse.api.errors import (
 from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events.validator import EventValidator
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.storage.state import StateFilter
-from synapse.types import RoomAlias, UserID
+from synapse.types import RoomAlias, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
@@ -261,6 +262,18 @@ class EventCreationHandler(object):
         if self._block_events_without_consent_error:
             self._consent_uri_builder = ConsentURIBuilder(self.config)
 
+        if (
+            not self.config.worker_app
+            and self.config.cleanup_extremities_with_dummy_events
+        ):
+            self.clock.looping_call(
+                lambda: run_as_background_process(
+                    "send_dummy_events_to_fill_extremities",
+                    self._send_dummy_events_to_fill_extremities
+                ),
+                5 * 60 * 1000,
+            )
+
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
                      prev_events_and_hashes=None, require_consent=True):
@@ -874,3 +887,63 @@ class EventCreationHandler(object):
             yield presence.bump_presence_active_time(user)
         except Exception:
             logger.exception("Error bumping presence active time")
+
+    @defer.inlineCallbacks
+    def _send_dummy_events_to_fill_extremities(self):
+        """Background task to send dummy events into rooms that have a large
+        number of extremities
+        """
+
+        room_ids = yield self.store.get_rooms_with_many_extremities(
+            min_count=10, limit=5,
+        )
+
+        for room_id in room_ids:
+            # For each room we need to find a joined member we can use to send
+            # the dummy event with.
+
+            prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+                room_id,
+            )
+
+            latest_event_ids = (
+                event_id for (event_id, _, _) in prev_events_and_hashes
+            )
+
+            members = yield self.state.get_current_users_in_room(
+                room_id, latest_event_ids=latest_event_ids,
+            )
+
+            user_id = None
+            for member in members:
+                if self.hs.is_mine_id(member):
+                    user_id = member
+                    break
+
+            if not user_id:
+                # We don't have a joined user.
+                # TODO: We should do something here to stop the room from
+                # appearing next time.
+                continue
+
+            requester = create_requester(user_id)
+
+            event, context = yield self.create_event(
+                requester,
+                {
+                    "type": "org.matrix.dummy_event",
+                    "content": {},
+                    "room_id": room_id,
+                    "sender": user_id,
+                },
+                prev_events_and_hashes=prev_events_and_hashes,
+            )
+
+            event.internal_metadata.proactively_send = False
+
+            yield self.send_nonmember_event(
+                requester,
+                event,
+                context,
+                ratelimit=False,
+            )
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 09e39c2c28..e8d16edbc8 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -190,6 +190,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             room_id,
         )
 
+    def get_rooms_with_many_extremities(self, min_count, limit):
+        """Get the top rooms with at least N extremities.
+
+        Args:
+            min_count (int): The minimum number of extremities
+            limit (int): The maximum number of rooms to return.
+
+        Returns:
+            Deferred[list]: At most `limit` room IDs that have at least
+            `min_count` extremities, sorted by extremity count.
+        """
+
+        def _get_rooms_with_many_extremities_txn(txn):
+            sql = """
+                SELECT room_id FROM event_forward_extremities
+                GROUP BY room_id
+                HAVING count(*) > ?
+                ORDER BY count(*) DESC
+                LIMIT ?
+            """
+
+            txn.execute(sql, (min_count, limit))
+            return [room_id for room_id, in txn]
+
+        return self.runInteraction(
+            "get_rooms_with_many_extremities",
+            _get_rooms_with_many_extremities_txn,
+        )
+
     @cached(max_entries=5000, iterable=True)
     def get_latest_event_ids_in_room(self, room_id):
         return self._simple_select_onecol(
diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py
index f4c81ef77d..e9e2d5337c 100644
--- a/tests/storage/test_cleanup_extrems.py
+++ b/tests/storage/test_cleanup_extrems.py
@@ -222,3 +222,44 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
             self.store.get_latest_event_ids_in_room(self.room_id)
         )
         self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))
+
+
+class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        config = self.default_config()
+        config["cleanup_extremities_with_dummy_events"] = True
+        return self.setup_test_homeserver(config=config)
+
+    def prepare(self, reactor, clock, homeserver):
+        self.store = homeserver.get_datastore()
+        self.room_creator = homeserver.get_room_creation_handler()
+
+        # Create a test user and room
+        self.user = UserID("alice", "test")
+        self.requester = Requester(self.user, None, False, None, None)
+        info = self.get_success(self.room_creator.create_room(self.requester, {}))
+        self.room_id = info["room_id"]
+
+    def test_send_dummy_event(self):
+        # Create a bushy graph with 50 extremities.
+
+        event_id_start = self.create_and_send_event(self.room_id, self.user)
+
+        for _ in range(50):
+            self.create_and_send_event(
+                self.room_id, self.user, prev_event_ids=[event_id_start]
+            )
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertEqual(len(latest_event_ids), 50)
+
+        # Pump the reactor repeatedly so that the background updates have a
+        # chance to run.
+        self.pump(10 * 60)
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))