diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/generic_worker.py | 68 |
1 files changed, 44 insertions, 24 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 2906b93f6a..440341cb3c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -17,7 +17,7 @@ import contextlib import logging import sys -from typing import Dict, Iterable +from typing import Dict, Iterable, Optional, Set from typing_extensions import ContextManager @@ -677,10 +677,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): self.notify_pushers = hs.config.start_pushers self.pusher_pool = hs.get_pusherpool() + self.send_handler = None # type: Optional[FederationSenderHandler] if hs.config.send_federation: - self.send_handler = FederationSenderHandler(hs, self) - else: - self.send_handler = None + self.send_handler = FederationSenderHandler(hs) async def on_rdata(self, stream_name, instance_name, token, rows): await super().on_rdata(stream_name, instance_name, token, rows) @@ -718,7 +717,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): if entities: self.notifier.on_new_event("to_device_key", token, users=entities) elif stream_name == DeviceListsStream.NAME: - all_room_ids = set() + all_room_ids = set() # type: Set[str] for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) @@ -769,24 +768,33 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): class FederationSenderHandler(object): - """Processes the replication stream and forwards the appropriate entries - to the federation sender. + """Processes the fedration replication stream + + This class is only instantiate on the worker responsible for sending outbound + federation transactions. It receives rows from the replication stream and forwards + the appropriate entries to the FederationSender class. """ - def __init__(self, hs: GenericWorkerServer, replication_client): + def __init__(self, hs: GenericWorkerServer): self.store = hs.get_datastore() self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() - self.replication_client = replication_client - + self._hs = hs + + # if the worker is restarted, we want to pick up where we left off in + # the replication stream, so load the position from the database. + # + # XXX is this actually worthwhile? Whenever the master is restarted, we'll + # drop some rows anyway (which is mostly fine because we're only dropping + # typing and presence notifications). If the replication stream is + # unreliable, why do we do all this hoop-jumping to store the position in the + # database? See also https://github.com/matrix-org/synapse/issues/7535. + # self.federation_position = self.store.federation_out_pos_startup - self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") self._last_ack = self.federation_position - self._room_serials = {} - self._room_typing = {} - def on_start(self): # There may be some events that are persisted but haven't been sent, # so send them now. @@ -849,22 +857,34 @@ class FederationSenderHandler(object): await self.federation_sender.send_read_receipt(receipt_info) async def update_token(self, token): + """Update the record of where we have processed to in the federation stream. + + Called after we have processed a an update received over replication. Sends + a FEDERATION_ACK back to the master, and stores the token that we have processed + in `federation_stream_position` so that we can restart where we left off. + """ try: self.federation_position = token # We linearize here to ensure we don't have races updating the token + # + # XXX this appears to be redundant, since the ReplicationCommandHandler + # has a linearizer which ensures that we only process one line of + # replication data at a time. Should we remove it, or is it doing useful + # service for robustness? Or could we replace it with an assertion that + # we're not being re-entered? + with (await self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - await self.store.update_federation_out_pos( - "federation", self.federation_position - ) + await self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self._hs.get_tcp_replication().send_federation_ack( + self.federation_position + ) + self._last_ack = self.federation_position except Exception: logger.exception("Error updating federation stream position") |