summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py48
1 files changed, 7 insertions, 41 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py

index e90695f026..c1b76d827b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set from typing_extensions import ContextManager -from twisted.internet import address, defer, reactor +from twisted.internet import address, reactor import synapse import synapse.events @@ -111,6 +111,7 @@ from synapse.rest.client.v1.room import ( RoomSendEventRestServlet, RoomStateEventRestServlet, RoomStateRestServlet, + RoomTypingRestServlet, ) from synapse.rest.client.v1.voip import VoipRestServlet from synapse.rest.client.v2_alpha import groups, sync, user_directory @@ -374,9 +375,8 @@ class GenericWorkerPresence(BasePresenceHandler): return _user_syncing() - @defer.inlineCallbacks - def notify_from_replication(self, states, stream_id): - parties = yield get_interested_parties(self.store, states) + async def notify_from_replication(self, states, stream_id): + parties = await get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -386,8 +386,7 @@ class GenericWorkerPresence(BasePresenceHandler): users=users_to_states.keys(), ) - @defer.inlineCallbacks - def process_replication_rows(self, token, rows): + async def process_replication_rows(self, token, rows): states = [ UserPresenceState( row.user_id, @@ -405,7 +404,7 @@ class GenericWorkerPresence(BasePresenceHandler): self.user_to_current_state[state.user_id] = state stream_id = token - yield self.notify_from_replication(states, stream_id) + await self.notify_from_replication(states, stream_id) def get_currently_syncing_users_for_replication(self) -> Iterable[str]: return [ @@ -451,37 +450,6 @@ class GenericWorkerPresence(BasePresenceHandler): await self._bump_active_client(user_id=user_id) -class GenericWorkerTyping(object): - def __init__(self, hs): - self._latest_room_serial = 0 - self._reset() - - def _reset(self): - """ - Reset the typing handler's data caches. - """ - # map room IDs to serial numbers - self._room_serials = {} - # map room IDs to sets of users currently typing - self._room_typing = {} - - def process_replication_rows(self, token, rows): - if self._latest_room_serial > token: - # The master has gone backwards. To prevent inconsistent data, just - # clear everything. - self._reset() - - # Set the latest serial token to whatever the server gave us. - self._latest_room_serial = token - - for row in rows: - self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = row.user_ids - - def get_current_token(self) -> int: - return self._latest_room_serial - - class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. @@ -558,6 +526,7 @@ class GenericWorkerServer(HomeServer): KeyUploadServlet(self).register(resource) AccountDataServlet(self).register(resource) RoomAccountDataServlet(self).register(resource) + RoomTypingRestServlet(self).register(resource) sync.register_servlets(self, resource) events.register_servlets(self, resource) @@ -669,9 +638,6 @@ class GenericWorkerServer(HomeServer): def build_presence_handler(self): return GenericWorkerPresence(self) - def build_typing_handler(self): - return GenericWorkerTyping(self) - class GenericWorkerReplicationHandler(ReplicationDataHandler): def __init__(self, hs):