diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 2906b93f6a..440341cb3c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -17,7 +17,7 @@
import contextlib
import logging
import sys
-from typing import Dict, Iterable
+from typing import Dict, Iterable, Optional, Set
from typing_extensions import ContextManager
@@ -677,10 +677,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
self.notify_pushers = hs.config.start_pushers
self.pusher_pool = hs.get_pusherpool()
+ self.send_handler = None # type: Optional[FederationSenderHandler]
if hs.config.send_federation:
- self.send_handler = FederationSenderHandler(hs, self)
- else:
- self.send_handler = None
+ self.send_handler = FederationSenderHandler(hs)
async def on_rdata(self, stream_name, instance_name, token, rows):
await super().on_rdata(stream_name, instance_name, token, rows)
@@ -718,7 +717,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == DeviceListsStream.NAME:
- all_room_ids = set()
+ all_room_ids = set() # type: Set[str]
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
@@ -769,24 +768,33 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
class FederationSenderHandler(object):
- """Processes the replication stream and forwards the appropriate entries
- to the federation sender.
+ """Processes the fedration replication stream
+
+ This class is only instantiate on the worker responsible for sending outbound
+ federation transactions. It receives rows from the replication stream and forwards
+ the appropriate entries to the FederationSender class.
"""
- def __init__(self, hs: GenericWorkerServer, replication_client):
+ def __init__(self, hs: GenericWorkerServer):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
- self.replication_client = replication_client
-
+ self._hs = hs
+
+ # if the worker is restarted, we want to pick up where we left off in
+ # the replication stream, so load the position from the database.
+ #
+ # XXX is this actually worthwhile? Whenever the master is restarted, we'll
+ # drop some rows anyway (which is mostly fine because we're only dropping
+ # typing and presence notifications). If the replication stream is
+ # unreliable, why do we do all this hoop-jumping to store the position in the
+ # database? See also https://github.com/matrix-org/synapse/issues/7535.
+ #
self.federation_position = self.store.federation_out_pos_startup
- self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
+ self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position
- self._room_serials = {}
- self._room_typing = {}
-
def on_start(self):
# There may be some events that are persisted but haven't been sent,
# so send them now.
@@ -849,22 +857,34 @@ class FederationSenderHandler(object):
await self.federation_sender.send_read_receipt(receipt_info)
async def update_token(self, token):
+ """Update the record of where we have processed to in the federation stream.
+
+ Called after we have processed a an update received over replication. Sends
+ a FEDERATION_ACK back to the master, and stores the token that we have processed
+ in `federation_stream_position` so that we can restart where we left off.
+ """
try:
self.federation_position = token
# We linearize here to ensure we don't have races updating the token
+ #
+ # XXX this appears to be redundant, since the ReplicationCommandHandler
+ # has a linearizer which ensures that we only process one line of
+ # replication data at a time. Should we remove it, or is it doing useful
+ # service for robustness? Or could we replace it with an assertion that
+ # we're not being re-entered?
+
with (await self._fed_position_linearizer.queue(None)):
- if self._last_ack < self.federation_position:
- await self.store.update_federation_out_pos(
- "federation", self.federation_position
- )
+ await self.store.update_federation_out_pos(
+ "federation", self.federation_position
+ )
- # We ACK this token over replication so that the master can drop
- # its in memory queues
- self.replication_client.send_federation_ack(
- self.federation_position
- )
- self._last_ack = self.federation_position
+ # We ACK this token over replication so that the master can drop
+ # its in memory queues
+ self._hs.get_tcp_replication().send_federation_ack(
+ self.federation_position
+ )
+ self._last_ack = self.federation_position
except Exception:
logger.exception("Error updating federation stream position")
|