diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/server.py | 6 | ||||
-rw-r--r-- | synapse/events/__init__.py | 12 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 3 | ||||
-rw-r--r-- | synapse/handlers/message.py | 72 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 29 |
5 files changed, 121 insertions, 1 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py index 7d56e2d141..6e5b46e6c3 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -317,6 +317,12 @@ class ServerConfig(Config): _check_resource_config(self.listeners) + # An experimental option to try and periodically clean up extremities + # by sending dummy events. + self.cleanup_extremities_with_dummy_events = config.get( + "cleanup_extremities_with_dummy_events", False, + ) + def has_tls_listener(self): return any(l["tls"] for l in self.listeners) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 1edd19cc13..f1fbb3d14a 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -92,6 +92,18 @@ class _EventInternalMetadata(object): """ return getattr(self, "soft_failed", False) + def should_proactively_send(self): + """Whether the eventm, if ours, should be sent to other clients and + servers. + + This is used for sending dummy events internally. Servers and clients + can still explicitly fetch the event. + + Returns: + bool + """ + return getattr(self, "proactively_send", True) + def _event_dict_property(key): # We want to be able to use hasattr with the event dict properties. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 4f0f939102..4224b29ecf 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -168,6 +168,9 @@ class FederationSender(object): if not is_mine and send_on_behalf_of is None: return + if not event.internal_metadata.should_proactively_send(): + return + try: # Get the state from before the event. # We need to make sure that this is the state from before diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 11650dc80c..3b5942b7ae 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -36,7 +36,7 @@ from synapse.api.urls import ConsentURIBuilder from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.state import StateFilter -from synapse.types import RoomAlias, UserID +from synapse.types import RoomAlias, UserID, create_requester from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background @@ -261,6 +261,16 @@ class EventCreationHandler(object): if self._block_events_without_consent_error: self._consent_uri_builder = ConsentURIBuilder(self.config) + if ( + not self.config.worker_app + and self.config.cleanup_extremities_with_dummy_events + ): + # XXX: Send dummy events. + self.clock.looping_call( + self._send_dummy_events_to_fill_extremities, + 5 * 60 * 1000, + ) + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, prev_events_and_hashes=None, require_consent=True): @@ -874,3 +884,63 @@ class EventCreationHandler(object): yield presence.bump_presence_active_time(user) except Exception: logger.exception("Error bumping presence active time") + + @defer.inlineCallbacks + def _send_dummy_events_to_fill_extremities(self): + """Background task to send dummy events into rooms that have a large + number of extremities + """ + + room_ids = yield self.store.get_rooms_with_many_extremities( + min_count=10, limit=5, + ) + + for room_id in room_ids: + # For each room we need to find a joined member we can use to send + # the dummy event with. + + prev_events_and_hashes = yield self.store.get_prev_events_for_room( + room_id, + ) + + latest_event_ids = ( + event_id for (event_id, _, _) in prev_events_and_hashes + ) + + members = yield self.state.get_current_users_in_room( + room_id, latest_event_ids=latest_event_ids, + ) + + user_id = None + for member in members: + if self.hs.is_mine_id(member): + user_id = member + break + + if not user_id: + # We don't have a joined user. + # TODO: We should do something here to stop the room from + # appearing next time. + continue + + requester = create_requester(user_id) + + event, context = yield self.create_event( + requester, + { + "type": "org.matrix.dummy_event", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_events_and_hashes=prev_events_and_hashes, + ) + + event.internal_metadata.proactively_send = False + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=False, + ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 09e39c2c28..e8d16edbc8 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -190,6 +190,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id, ) + def get_rooms_with_many_extremities(self, min_count, limit): + """Get the top rooms with at least N extremities. + + Args: + min_count (int): The minimum number of extremities + limit (int): The maximum number of rooms to return. + + Returns: + Deferred[list]: At most `limit` room IDs that have at least + `min_count` extremities, sorted by extremity count. + """ + + def _get_rooms_with_many_extremities_txn(txn): + sql = """ + SELECT room_id FROM event_forward_extremities + GROUP BY room_id + HAVING count(*) > ? + ORDER BY count(*) DESC + LIMIT ? + """ + + txn.execute(sql, (min_count, limit)) + return [room_id for room_id, in txn] + + return self.runInteraction( + "get_rooms_with_many_extremities", + _get_rooms_with_many_extremities_txn, + ) + @cached(max_entries=5000, iterable=True) def get_latest_event_ids_in_room(self, room_id): return self._simple_select_onecol( |