diff --git a/changelog.d/6053.bugfix b/changelog.d/6053.bugfix
new file mode 100644
index 0000000000..6311157bf6
--- /dev/null
+++ b/changelog.d/6053.bugfix
@@ -0,0 +1 @@
+Prevent exceptions being logged when extremity-cleanup events fail due to lack of user consent to the terms of service.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1f8272784e..0f8cce8ffe 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -222,6 +222,13 @@ class MessageHandler(object):
}
+# The duration (in ms) after which rooms should be removed
+# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
+# to generate a dummy event for them once more)
+#
+_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
+
+
class EventCreationHandler(object):
def __init__(self, hs):
self.hs = hs
@@ -258,6 +265,13 @@ class EventCreationHandler(object):
self.config.block_events_without_consent_error
)
+ # Rooms which should be excluded from dummy insertion. (For instance,
+ # those without local users who can send events into the room).
+ #
+ # map from room id to time-of-last-attempt.
+ #
+ self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int]
+
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
@@ -888,9 +902,11 @@ class EventCreationHandler(object):
"""Background task to send dummy events into rooms that have a large
number of extremities
"""
-
+ self._expire_rooms_to_exclude_from_dummy_event_insertion()
room_ids = yield self.store.get_rooms_with_many_extremities(
- min_count=10, limit=5
+ min_count=10,
+ limit=5,
+ room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(),
)
for room_id in room_ids:
@@ -904,32 +920,61 @@ class EventCreationHandler(object):
members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids
)
+ dummy_event_sent = False
+ for user_id in members:
+ if not self.hs.is_mine_id(user_id):
+ continue
+ requester = create_requester(user_id)
+ try:
+ event, context = yield self.create_event(
+ requester,
+ {
+ "type": "org.matrix.dummy_event",
+ "content": {},
+ "room_id": room_id,
+ "sender": user_id,
+ },
+ prev_events_and_hashes=prev_events_and_hashes,
+ )
- user_id = None
- for member in members:
- if self.hs.is_mine_id(member):
- user_id = member
- break
-
- if not user_id:
- # We don't have a joined user.
- # TODO: We should do something here to stop the room from
- # appearing next time.
- continue
+ event.internal_metadata.proactively_send = False
- requester = create_requester(user_id)
+ yield self.send_nonmember_event(
+ requester, event, context, ratelimit=False
+ )
+ dummy_event_sent = True
+ break
+ except ConsentNotGivenError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of consent. Will try another user" % (room_id, user_id)
+ )
+ except AuthError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of power. Will try another user" % (room_id, user_id)
+ )
- event, context = yield self.create_event(
- requester,
- {
- "type": "org.matrix.dummy_event",
- "content": {},
- "room_id": room_id,
- "sender": user_id,
- },
- prev_events_and_hashes=prev_events_and_hashes,
+ if not dummy_event_sent:
+ # Did not find a valid user in the room, so remove from future attempts
+ # Exclusion is time limited, so the room will be rechecked in the future
+ # dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+ logger.info(
+ "Failed to send dummy event into room %s. Will exclude it from "
+ "future attempts until cache expires" % (room_id,)
+ )
+ now = self.clock.time_msec()
+ self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
+
+ def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
+ expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+ to_expire = set()
+ for room_id, time in self._rooms_to_exclude_from_dummy_event_insertion.items():
+ if time < expire_before:
+ to_expire.add(room_id)
+ for room_id in to_expire:
+ logger.debug(
+ "Expiring room id %s from dummy event insertion exclusion cache",
+ room_id,
)
-
- event.internal_metadata.proactively_send = False
-
- yield self.send_nonmember_event(requester, event, context, ratelimit=False)
+ del self._rooms_to_exclude_from_dummy_event_insertion[room_id]
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 4f500d893e..f5e8c39262 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import logging
import random
@@ -190,12 +191,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id,
)
- def get_rooms_with_many_extremities(self, min_count, limit):
+ def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
"""Get the top rooms with at least N extremities.
Args:
min_count (int): The minimum number of extremities
limit (int): The maximum number of rooms to return.
+ room_id_filter (iterable[str]): room_ids to exclude from the results
Returns:
Deferred[list]: At most `limit` room IDs that have at least
@@ -203,15 +205,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"""
def _get_rooms_with_many_extremities_txn(txn):
+ where_clause = "1=1"
+ if room_id_filter:
+ where_clause = "room_id NOT IN (%s)" % (
+ ",".join("?" for _ in room_id_filter),
+ )
+
sql = """
SELECT room_id FROM event_forward_extremities
+ WHERE %s
GROUP BY room_id
HAVING count(*) > ?
ORDER BY count(*) DESC
LIMIT ?
- """
+ """ % (
+ where_clause,
+ )
- txn.execute(sql, (min_count, limit))
+ query_args = list(itertools.chain(room_id_filter, [min_count, limit]))
+ txn.execute(sql, query_args)
return [room_id for room_id, in txn]
return self.runInteraction(
diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py
index e9e2d5337c..34f9c72709 100644
--- a/tests/storage/test_cleanup_extrems.py
+++ b/tests/storage/test_cleanup_extrems.py
@@ -14,7 +14,13 @@
# limitations under the License.
import os.path
+from unittest.mock import patch
+from mock import Mock
+
+import synapse.rest.admin
+from synapse.api.constants import EventTypes
+from synapse.rest.client.v1 import login, room
from synapse.storage import prepare_database
from synapse.types import Requester, UserID
@@ -225,6 +231,14 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
+ CONSENT_VERSION = "1"
+ EXTREMITIES_COUNT = 50
+ servlets = [
+ synapse.rest.admin.register_servlets_for_client_rest_resource,
+ login.register_servlets,
+ room.register_servlets,
+ ]
+
def make_homeserver(self, reactor, clock):
config = self.default_config()
config["cleanup_extremities_with_dummy_events"] = True
@@ -233,28 +247,39 @@ class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore()
self.room_creator = homeserver.get_room_creation_handler()
+ self.event_creator_handler = homeserver.get_event_creation_handler()
# Create a test user and room
- self.user = UserID("alice", "test")
+ self.user = UserID.from_string(self.register_user("user1", "password"))
+ self.token1 = self.login("user1", "password")
self.requester = Requester(self.user, None, False, None, None)
info = self.get_success(self.room_creator.create_room(self.requester, {}))
self.room_id = info["room_id"]
+ self.event_creator = homeserver.get_event_creation_handler()
+ homeserver.config.user_consent_version = self.CONSENT_VERSION
def test_send_dummy_event(self):
- # Create a bushy graph with 50 extremities.
-
- event_id_start = self.create_and_send_event(self.room_id, self.user)
+ self._create_extremity_rich_graph()
- for _ in range(50):
- self.create_and_send_event(
- self.room_id, self.user, prev_event_ids=[event_id_start]
- )
+ # Pump the reactor repeatedly so that the background updates have a
+ # chance to run.
+ self.pump(10 * 60)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
- self.assertEqual(len(latest_event_ids), 50)
+ self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
+ @patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=0)
+ def test_send_dummy_events_when_insufficient_power(self):
+ self._create_extremity_rich_graph()
+ # Criple power levels
+ self.helper.send_state(
+ self.room_id,
+ EventTypes.PowerLevels,
+ body={"users": {str(self.user): -1}},
+ tok=self.token1,
+ )
# Pump the reactor repeatedly so that the background updates have a
# chance to run.
self.pump(10 * 60)
@@ -262,4 +287,108 @@ class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
+ # Check that the room has not been pruned
+ self.assertTrue(len(latest_event_ids) > 10)
+
+ # New user with regular levels
+ user2 = self.register_user("user2", "password")
+ token2 = self.login("user2", "password")
+ self.helper.join(self.room_id, user2, tok=token2)
+ self.pump(10 * 60)
+
+ latest_event_ids = self.get_success(
+ self.store.get_latest_event_ids_in_room(self.room_id)
+ )
+ self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
+
+ @patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=0)
+ def test_send_dummy_event_without_consent(self):
+ self._create_extremity_rich_graph()
+ self._enable_consent_checking()
+
+ # Pump the reactor repeatedly so that the background updates have a
+ # chance to run. Attempt to add dummy event with user that has not consented
+ # Check that dummy event send fails.
+ self.pump(10 * 60)
+ latest_event_ids = self.get_success(
+ self.store.get_latest_event_ids_in_room(self.room_id)
+ )
+ self.assertTrue(len(latest_event_ids) == self.EXTREMITIES_COUNT)
+
+ # Create new user, and add consent
+ user2 = self.register_user("user2", "password")
+ token2 = self.login("user2", "password")
+ self.get_success(
+ self.store.user_set_consent_version(user2, self.CONSENT_VERSION)
+ )
+ self.helper.join(self.room_id, user2, tok=token2)
+
+ # Background updates should now cause a dummy event to be added to the graph
+ self.pump(10 * 60)
+
+ latest_event_ids = self.get_success(
+ self.store.get_latest_event_ids_in_room(self.room_id)
+ )
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
+
+ @patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=250)
+ def test_expiry_logic(self):
+ """Simple test to ensure that _expire_rooms_to_exclude_from_dummy_event_insertion()
+ expires old entries correctly.
+ """
+ self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
+ "1"
+ ] = 100000
+ self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
+ "2"
+ ] = 200000
+ self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
+ "3"
+ ] = 300000
+ self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
+ # All entries within time frame
+ self.assertEqual(
+ len(
+ self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion
+ ),
+ 3,
+ )
+ # Oldest room to expire
+ self.pump(1)
+ self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
+ self.assertEqual(
+ len(
+ self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion
+ ),
+ 2,
+ )
+ # All rooms to expire
+ self.pump(2)
+ self.assertEqual(
+ len(
+ self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion
+ ),
+ 0,
+ )
+
+ def _create_extremity_rich_graph(self):
+ """Helper method to create bushy graph on demand"""
+
+ event_id_start = self.create_and_send_event(self.room_id, self.user)
+
+ for _ in range(self.EXTREMITIES_COUNT):
+ self.create_and_send_event(
+ self.room_id, self.user, prev_event_ids=[event_id_start]
+ )
+
+ latest_event_ids = self.get_success(
+ self.store.get_latest_event_ids_in_room(self.room_id)
+ )
+ self.assertEqual(len(latest_event_ids), 50)
+
+ def _enable_consent_checking(self):
+ """Helper method to enable consent checking"""
+ self.event_creator._block_events_without_consent_error = "No consent from user"
+ consent_uri_builder = Mock()
+ consent_uri_builder.build_user_consent_uri.return_value = "http://example.com"
+ self.event_creator._consent_uri_builder = consent_uri_builder
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 86c7ac350d..b58386994e 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -75,3 +75,43 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
el = r[i]
depth = el[2]
self.assertLessEqual(5, depth)
+
+ @defer.inlineCallbacks
+ def test_get_rooms_with_many_extremities(self):
+ room1 = "#room1"
+ room2 = "#room2"
+ room3 = "#room3"
+
+ def insert_event(txn, i, room_id):
+ event_id = "$event_%i:local" % i
+ txn.execute(
+ (
+ "INSERT INTO event_forward_extremities (room_id, event_id) "
+ "VALUES (?, ?)"
+ ),
+ (room_id, event_id),
+ )
+
+ for i in range(0, 20):
+ yield self.store.runInteraction("insert", insert_event, i, room1)
+ yield self.store.runInteraction("insert", insert_event, i, room2)
+ yield self.store.runInteraction("insert", insert_event, i, room3)
+
+ # Test simple case
+ r = yield self.store.get_rooms_with_many_extremities(5, 5, [])
+ self.assertEqual(len(r), 3)
+
+ # Does filter work?
+
+ r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1])
+ self.assertTrue(room2 in r)
+ self.assertTrue(room3 in r)
+ self.assertEqual(len(r), 2)
+
+ r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1, room2])
+ self.assertEqual(r, [room3])
+
+ # Does filter and limit work?
+
+ r = yield self.store.get_rooms_with_many_extremities(5, 1, [room1])
+ self.assertTrue(r == [room2] or r == [room3])
|