diff options
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r-- | synapse/app/federation_sender.py | 30 |
1 files changed, 30 insertions, 0 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index a461442fdc..9711a7147c 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore @@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams import ReceiptsStream from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background @@ -202,6 +205,7 @@ class FederationSenderHandler(object): """ def __init__(self, hs, replication_client): self.store = hs.get_datastore() + self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() self.replication_client = replication_client @@ -234,6 +238,32 @@ class FederationSenderHandler(object): elif stream_name == "events": self.federation_sender.notify_new_events(token) + # ... and when new receipts happen + elif stream_name == ReceiptsStream.NAME: + run_as_background_process( + "process_receipts_for_federation", self._on_new_receipts, rows, + ) + + @defer.inlineCallbacks + def _on_new_receipts(self, rows): + """ + Args: + rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): + new receipts to be processed + """ + for receipt in rows: + # we only want to send on receipts for our own users + if not self._is_mine_id(receipt.user_id): + continue + receipt_info = ReadReceipt( + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + [receipt.event_id], + receipt.data, + ) + yield self.federation_sender.send_read_receipt(receipt_info) + @defer.inlineCallbacks def update_token(self, token): try: |