Use a database table to hold the users that should have full presence sent to them, instead of something in-memory (#9823)
1 files changed, 30 insertions, 33 deletions
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index a1a2b9aecc..cecdc96bf5 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -56,14 +56,6 @@ class ModuleApi:
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)
- # The next time these users sync, they will receive the current presence
- # state of all local users. Users are added by send_local_online_presence_to,
- # and removed after a successful sync.
- #
- # We make this a private variable to deter modules from accessing it directly,
- # though other classes in Synapse will still do so.
- self._send_full_presence_to_local_users = set()
-
@property
def http_client(self):
"""Allows making outbound HTTP requests to remote resources.
@@ -405,39 +397,44 @@ class ModuleApi:
Updates to remote users will be sent immediately, whereas local users will receive
them on their next sync attempt.
- Note that this method can only be run on the main or federation_sender worker
- processes.
+ Note that this method can only be run on the process that is configured to write to the
+ presence stream. By default this is the main process.
"""
- if not self._hs.should_send_federation():
+ if self._hs._instance_name not in self._hs.config.worker.writers.presence:
raise Exception(
"send_local_online_presence_to can only be run "
- "on processes that send federation",
+ "on the process that is configured to write to the "
+ "presence stream (by default this is the main process)",
)
+ local_users = set()
+ remote_users = set()
for user in users:
if self._hs.is_mine_id(user):
- # Modify SyncHandler._generate_sync_entry_for_presence to call
- # presence_source.get_new_events with an empty `from_key` if
- # that user's ID were in a list modified by ModuleApi somewhere.
- # That user would then get all presence state on next incremental sync.
-
- # Force a presence initial_sync for this user next time
- self._send_full_presence_to_local_users.add(user)
+ local_users.add(user)
else:
- # Retrieve presence state for currently online users that this user
- # is considered interested in
- presence_events, _ = await self._presence_stream.get_new_events(
- UserID.from_string(user), from_key=None, include_offline=False
- )
-
- # Send to remote destinations.
-
- # We pull out the presence handler here to break a cyclic
- # dependency between the presence router and module API.
- presence_handler = self._hs.get_presence_handler()
- await presence_handler.maybe_send_presence_to_interested_destinations(
- presence_events
- )
+ remote_users.add(user)
+
+ # We pull out the presence handler here to break a cyclic
+ # dependency between the presence router and module API.
+ presence_handler = self._hs.get_presence_handler()
+
+ if local_users:
+ # Force a presence initial_sync for these users next time they sync.
+ await presence_handler.send_full_presence_to_users(local_users)
+
+ for user in remote_users:
+ # Retrieve presence state for currently online users that this user
+ # is considered interested in.
+ presence_events, _ = await self._presence_stream.get_new_events(
+ UserID.from_string(user), from_key=None, include_offline=False
+ )
+
+ # Send to remote destinations.
+ destination = UserID.from_string(user).domain
+ presence_handler.get_federation_queue().send_presence_to_destinations(
+ presence_events, destination
+ )
class PublicRoomListManager:
|