diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index e90695f026..c1b76d827b 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
from typing_extensions import ContextManager
-from twisted.internet import address, defer, reactor
+from twisted.internet import address, reactor
import synapse
import synapse.events
@@ -111,6 +111,7 @@ from synapse.rest.client.v1.room import (
RoomSendEventRestServlet,
RoomStateEventRestServlet,
RoomStateRestServlet,
+ RoomTypingRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync, user_directory
@@ -374,9 +375,8 @@ class GenericWorkerPresence(BasePresenceHandler):
return _user_syncing()
- @defer.inlineCallbacks
- def notify_from_replication(self, states, stream_id):
- parties = yield get_interested_parties(self.store, states)
+ async def notify_from_replication(self, states, stream_id):
+ parties = await get_interested_parties(self.store, states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
@@ -386,8 +386,7 @@ class GenericWorkerPresence(BasePresenceHandler):
users=users_to_states.keys(),
)
- @defer.inlineCallbacks
- def process_replication_rows(self, token, rows):
+ async def process_replication_rows(self, token, rows):
states = [
UserPresenceState(
row.user_id,
@@ -405,7 +404,7 @@ class GenericWorkerPresence(BasePresenceHandler):
self.user_to_current_state[state.user_id] = state
stream_id = token
- yield self.notify_from_replication(states, stream_id)
+ await self.notify_from_replication(states, stream_id)
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
@@ -451,37 +450,6 @@ class GenericWorkerPresence(BasePresenceHandler):
await self._bump_active_client(user_id=user_id)
-class GenericWorkerTyping(object):
- def __init__(self, hs):
- self._latest_room_serial = 0
- self._reset()
-
- def _reset(self):
- """
- Reset the typing handler's data caches.
- """
- # map room IDs to serial numbers
- self._room_serials = {}
- # map room IDs to sets of users currently typing
- self._room_typing = {}
-
- def process_replication_rows(self, token, rows):
- if self._latest_room_serial > token:
- # The master has gone backwards. To prevent inconsistent data, just
- # clear everything.
- self._reset()
-
- # Set the latest serial token to whatever the server gave us.
- self._latest_room_serial = token
-
- for row in rows:
- self._room_serials[row.room_id] = token
- self._room_typing[row.room_id] = row.user_ids
-
- def get_current_token(self) -> int:
- return self._latest_room_serial
-
-
class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
@@ -558,6 +526,7 @@ class GenericWorkerServer(HomeServer):
KeyUploadServlet(self).register(resource)
AccountDataServlet(self).register(resource)
RoomAccountDataServlet(self).register(resource)
+ RoomTypingRestServlet(self).register(resource)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)
@@ -669,9 +638,6 @@ class GenericWorkerServer(HomeServer):
def build_presence_handler(self):
return GenericWorkerPresence(self)
- def build_typing_handler(self):
- return GenericWorkerTyping(self)
-
class GenericWorkerReplicationHandler(ReplicationDataHandler):
def __init__(self, hs):
|