diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-24 15:51:41 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-24 15:54:38 +0000 |
commit | 7eec84bfbec26ffcd3835ecb2af0ac2d5b352fd6 (patch) | |
tree | 2b040ea3e63db328567babc939520e72d600aba3 | |
parent | Make ReplicationStreamer work on workers (diff) | |
download | synapse-7eec84bfbec26ffcd3835ecb2af0ac2d5b352fd6.tar.xz |
Shuffle around code typing handlers
-rw-r--r-- | synapse/app/generic_worker.py | 63 | ||||
-rw-r--r-- | synapse/config/server.py | 2 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 47 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 71 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 8 | ||||
-rw-r--r-- | synapse/server.py | 11 |
6 files changed, 101 insertions, 101 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index b975fb3e1c..9eba101d0a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -66,7 +66,7 @@ from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientFactory from synapse.replication.tcp.commands import ClearUserSyncsCommand -from synapse.replication.tcp.handler import WorkerReplicationDataHandler +from synapse.replication.tcp.handler import ReplicationDataHandler from synapse.replication.tcp.streams import ( AccountDataStream, DeviceListsStream, @@ -77,7 +77,6 @@ from synapse.replication.tcp.streams import ( ReceiptsStream, TagAccountDataStream, ToDeviceStream, - TypingStream, ) from synapse.replication.tcp.streams.events import ( EventsStream, @@ -381,43 +380,6 @@ class GenericWorkerPresence(object): return set() -class GenericWorkerTyping(object): - def __init__(self, hs): - self._latest_room_serial = 0 - self._reset() - - def _reset(self): - """ - Reset the typing handler's data caches. - """ - # map room IDs to serial numbers - self._room_serials = {} - # map room IDs to sets of users currently typing - self._room_typing = {} - - def stream_positions(self): - # We must update this typing token from the response of the previous - # sync. In particular, the stream id may "reset" back to zero/a low - # value which we *must* use for the next replication request. - return {"typing": self._latest_room_serial} - - def process_replication_rows(self, token, rows): - if self._latest_room_serial > token: - # The master has gone backwards. To prevent inconsistent data, just - # clear everything. - self._reset() - - # Set the latest serial token to whatever the server gave us. - self._latest_room_serial = token - - for row in rows: - self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = row.user_ids - - def get_current_token(self) -> int: - return self._latest_room_serial - - class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. @@ -619,17 +581,15 @@ class GenericWorkerServer(HomeServer): def build_presence_handler(self): return GenericWorkerPresence(self) - def build_typing_handler(self): - return GenericWorkerTyping(self) - def build_replication_data_handler(self): return GenericWorkerReplicationHandler(self) -class GenericWorkerReplicationHandler(WorkerReplicationDataHandler): +class GenericWorkerReplicationHandler(ReplicationDataHandler): def __init__(self, hs): + super().__init__(hs) + self.store = hs.get_datastore() - self.typing_handler = hs.get_typing_handler() # NB this is a SynchrotronPresence, not a normal PresenceHandler self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() @@ -643,14 +603,12 @@ class GenericWorkerReplicationHandler(WorkerReplicationDataHandler): self.send_handler = None async def on_rdata(self, stream_name, token, rows): - await super(GenericWorkerReplicationHandler, self).on_rdata( - stream_name, token, rows - ) + await super().on_rdata(stream_name, token, rows) run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): - args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate() - args.update(self.typing_handler.stream_positions()) + args = super().get_streams_to_replicate() + if self.send_handler: args.update(self.send_handler.stream_positions()) return args @@ -698,11 +656,6 @@ class GenericWorkerReplicationHandler(WorkerReplicationDataHandler): await self.pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} ) - elif stream_name == TypingStream.NAME: - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] - ) elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: @@ -938,6 +891,8 @@ def start(config_options): # Force the pushers to start since they will be disabled in the main config config.send_federation = True + config.server.handle_typing = False + synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts ss = GenericWorkerServer( diff --git a/synapse/config/server.py b/synapse/config/server.py index 7525765fee..0f69297cc1 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -83,6 +83,8 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + self.handle_typing = config.get("handle_typing", True) + # Whether to enable user presence. self.use_presence = config.get("use_presence", True) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c7bc14c623..a6edea412b 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError from synapse.logging.context import run_in_background +from synapse.replication.tcp.streams import TypingStream from synapse.types import UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure @@ -288,6 +289,52 @@ class TypingHandler(object): return self._latest_room_serial +class TypingSlaveHandler(object): + def __init__(self, hs): + self.notifier = hs.get_notifier() + + self._latest_room_serial = 0 + self._reset() + + def _reset(self): + """ + Reset the typing handler's data caches. + """ + # map room IDs to serial numbers + self._room_serials = {} + # map room IDs to sets of users currently typing + self._room_typing = {} + + def stream_positions(self): + # We must update this typing token from the response of the previous + # sync. In particular, the stream id may "reset" back to zero/a low + # value which we *must* use for the next replication request. + return {"typing": self._latest_room_serial} + + def process_replication_rows(self, stream_name, token, rows): + if stream_name != TypingStream.NAME: + return + + if self._latest_room_serial > token: + # The master has gone backwards. To prevent inconsistent data, just + # clear everything. + self._reset() + + # Set the latest serial token to whatever the server gave us. + self._latest_room_serial = token + + for row in rows: + self._room_serials[row.room_id] = token + self._room_typing[row.room_id] = row.user_ids + + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows] + ) + + def get_current_token(self) -> int: + return self._latest_room_serial + + class TypingNotificationEventSource(object): def __init__(self, hs): self.hs = hs diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b620b29dfb..c197f6c26d 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -329,10 +329,17 @@ class ReplicationClientHandler: self.send_command(RdataCommand(stream_name, token, data)) -class DummyReplicationDataHandler: +class ReplicationDataHandler: """A replication data handler that simply discards all data. """ + def __init__(self, hs): + self.store = hs.get_datastore() + self.typing_handler = hs.get_typing_handler() + + self.slaved_store = hs.config.worker_app is not None + self.slaved_typing = not hs.config.server.handle_typing + async def on_rdata(self, stream_name: str, token: int, rows: list): """Called to handle a batch of replication data with a given stream token. @@ -345,7 +352,11 @@ class DummyReplicationDataHandler: rows (list): a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ - pass + if self.slaved_store: + self.store.process_replication_rows(stream_name, token, rows) + + if self.slaved_typing: + self.typing_handler.process_replication_rows(stream_name, token, rows) def get_streams_to_replicate(self) -> Dict[str, int]: """Called when a new connection has been established and we need to @@ -355,49 +366,25 @@ class DummyReplicationDataHandler: map from stream name to the most recent update we have for that stream (ie, the point we want to start replicating from) """ - return {} - - async def on_position(self, stream_name: str, token: int): - pass - + args = {} # type: Dict[str, int] -class WorkerReplicationDataHandler: - """A replication data handler that calls slave data stores. - """ - - def __init__(self, store): - self.store = store - - async def on_rdata(self, stream_name: str, token: int, rows: list): - """Called to handle a batch of replication data with a given stream token. + if self.slaved_store: + args = self.store.stream_positions() + user_account_data = args.pop("user_account_data", None) + room_account_data = args.pop("room_account_data", None) + if user_account_data: + args["account_data"] = user_account_data + elif room_account_data: + args["account_data"] = room_account_data - By default this just pokes the slave store. Can be overridden in subclasses to - handle more. + if self.slaved_typing: + args.update(self.typing_handler.stream_positions()) - Args: - stream_name (str): name of the replication stream for this batch of rows - token (int): stream token for this batch of rows - rows (list): a list of Stream.ROW_TYPE objects as returned by - Stream.parse_row. - """ - self.store.process_replication_rows(stream_name, token, rows) - - def get_streams_to_replicate(self) -> Dict[str, int]: - """Called when a new connection has been established and we need to - subscribe to streams. - - Returns: - map from stream name to the most recent update we have for - that stream (ie, the point we want to start replicating from) - """ - args = self.store.stream_positions() - user_account_data = args.pop("user_account_data", None) - room_account_data = args.pop("room_account_data", None) - if user_account_data: - args["account_data"] = user_account_data - elif room_account_data: - args["account_data"] = room_account_data return args async def on_position(self, stream_name: str, token: int): - self.store.process_replication_rows(stream_name, token, []) + if self.slaved_store: + self.store.process_replication_rows(stream_name, token, []) + + if self.slaved_typing: + self.typing_handler.process_replication_rows(stream_name, token, []) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index b2a1785c08..c9d671210b 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -25,7 +25,7 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol -from synapse.replication.tcp.streams import STREAMS_MAP, Stream +from synapse.replication.tcp.streams import STREAMS_MAP, Stream, TypingStream from synapse.replication.tcp.streams.federation import FederationStream from synapse.util.metrics import Measure @@ -79,8 +79,14 @@ class ReplicationStreamer(object): # hase been disabled on the master. continue + if stream == TypingStream: + continue + self.streams.append(stream(hs)) + if hs.config.server.handle_typing: + self.streams.append(TypingStream(hs)) + self.streams_by_name = {stream.NAME: stream for stream in self.streams} self.notifier.add_replication_callback(self.on_notifier_poke) diff --git a/synapse/server.py b/synapse/server.py index 5f5d79161c..8955c32cc4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -78,7 +78,7 @@ from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler -from synapse.handlers.typing import TypingHandler +from synapse.handlers.typing import TypingHandler, TypingSlaveHandler from synapse.handlers.user_directory import UserDirectoryHandler from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -86,8 +86,8 @@ from synapse.notifier import Notifier from synapse.push.action_generator import ActionGenerator from synapse.push.pusherpool import PusherPool from synapse.replication.tcp.handler import ( - DummyReplicationDataHandler, ReplicationClientHandler, + ReplicationDataHandler, ) from synapse.replication.tcp.resource import ReplicationStreamer from synapse.rest.media.v1.media_repository import ( @@ -354,7 +354,10 @@ class HomeServer(object): return PresenceHandler(self) def build_typing_handler(self): - return TypingHandler(self) + if self.config.handle_typing: + return TypingHandler(self) + else: + return TypingSlaveHandler(self) def build_sync_handler(self): return SyncHandler(self) @@ -555,7 +558,7 @@ class HomeServer(object): return ReplicationStreamer(self) def build_replication_data_handler(self): - return DummyReplicationDataHandler() + return ReplicationDataHandler(self) def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) |