diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/message.py | 99 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 18 |
2 files changed, 87 insertions, 30 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1f8272784e..0f8cce8ffe 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -222,6 +222,13 @@ class MessageHandler(object): } +# The duration (in ms) after which rooms should be removed +# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try +# to generate a dummy event for them once more) +# +_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000 + + class EventCreationHandler(object): def __init__(self, hs): self.hs = hs @@ -258,6 +265,13 @@ class EventCreationHandler(object): self.config.block_events_without_consent_error ) + # Rooms which should be excluded from dummy insertion. (For instance, + # those without local users who can send events into the room). + # + # map from room id to time-of-last-attempt. + # + self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int] + # we need to construct a ConsentURIBuilder here, as it checks that the necessary # config options, but *only* if we have a configuration for which we are # going to need it. @@ -888,9 +902,11 @@ class EventCreationHandler(object): """Background task to send dummy events into rooms that have a large number of extremities """ - + self._expire_rooms_to_exclude_from_dummy_event_insertion() room_ids = yield self.store.get_rooms_with_many_extremities( - min_count=10, limit=5 + min_count=10, + limit=5, + room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(), ) for room_id in room_ids: @@ -904,32 +920,61 @@ class EventCreationHandler(object): members = yield self.state.get_current_users_in_room( room_id, latest_event_ids=latest_event_ids ) + dummy_event_sent = False + for user_id in members: + if not self.hs.is_mine_id(user_id): + continue + requester = create_requester(user_id) + try: + event, context = yield self.create_event( + requester, + { + "type": "org.matrix.dummy_event", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_events_and_hashes=prev_events_and_hashes, + ) - user_id = None - for member in members: - if self.hs.is_mine_id(member): - user_id = member - break - - if not user_id: - # We don't have a joined user. - # TODO: We should do something here to stop the room from - # appearing next time. - continue + event.internal_metadata.proactively_send = False - requester = create_requester(user_id) + yield self.send_nonmember_event( + requester, event, context, ratelimit=False + ) + dummy_event_sent = True + break + except ConsentNotGivenError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of consent. Will try another user" % (room_id, user_id) + ) + except AuthError: + logger.info( + "Failed to send dummy event into room %s for user %s due to " + "lack of power. Will try another user" % (room_id, user_id) + ) - event, context = yield self.create_event( - requester, - { - "type": "org.matrix.dummy_event", - "content": {}, - "room_id": room_id, - "sender": user_id, - }, - prev_events_and_hashes=prev_events_and_hashes, + if not dummy_event_sent: + # Did not find a valid user in the room, so remove from future attempts + # Exclusion is time limited, so the room will be rechecked in the future + # dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY + logger.info( + "Failed to send dummy event into room %s. Will exclude it from " + "future attempts until cache expires" % (room_id,) + ) + now = self.clock.time_msec() + self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now + + def _expire_rooms_to_exclude_from_dummy_event_insertion(self): + expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY + to_expire = set() + for room_id, time in self._rooms_to_exclude_from_dummy_event_insertion.items(): + if time < expire_before: + to_expire.add(room_id) + for room_id in to_expire: + logger.debug( + "Expiring room id %s from dummy event insertion exclusion cache", + room_id, ) - - event.internal_metadata.proactively_send = False - - yield self.send_nonmember_event(requester, event, context, ratelimit=False) + del self._rooms_to_exclude_from_dummy_event_insertion[room_id] diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 4f500d893e..f5e8c39262 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging import random @@ -190,12 +191,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id, ) - def get_rooms_with_many_extremities(self, min_count, limit): + def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter): """Get the top rooms with at least N extremities. Args: min_count (int): The minimum number of extremities limit (int): The maximum number of rooms to return. + room_id_filter (iterable[str]): room_ids to exclude from the results Returns: Deferred[list]: At most `limit` room IDs that have at least @@ -203,15 +205,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ def _get_rooms_with_many_extremities_txn(txn): + where_clause = "1=1" + if room_id_filter: + where_clause = "room_id NOT IN (%s)" % ( + ",".join("?" for _ in room_id_filter), + ) + sql = """ SELECT room_id FROM event_forward_extremities + WHERE %s GROUP BY room_id HAVING count(*) > ? ORDER BY count(*) DESC LIMIT ? - """ + """ % ( + where_clause, + ) - txn.execute(sql, (min_count, limit)) + query_args = list(itertools.chain(room_id_filter, [min_count, limit])) + txn.execute(sql, query_args) return [room_id for room_id, in txn] return self.runInteraction( |