From eed7271b3b1e754e1b164da61d225ca4326cfa0a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 12 Mar 2019 16:50:58 +0000 Subject: declare a ReadReceipt class I'm going to use this in queues and things, so it'll be useful to give it more of a structure. --- synapse/handlers/receipts.py | 46 +++++++++++++++++++++----------------------- synapse/types.py | 12 ++++++++++++ 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 1728089667..733e7c3752 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -16,7 +16,7 @@ import logging from twisted.internet import defer -from synapse.types import get_domain_from_id +from synapse.types import ReadReceipt, get_domain_from_id from ._base import BaseHandler @@ -42,13 +42,13 @@ class ReceiptsHandler(BaseHandler): """Called when we receive an EDU of type m.receipt from a remote HS. """ receipts = [ - { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_ids": user_values["event_ids"], - "data": user_values.get("data", {}), - } + ReadReceipt( + room_id=room_id, + receipt_type=receipt_type, + user_id=user_id, + event_ids=user_values["event_ids"], + data=user_values.get("data", {}), + ) for room_id, room_values in content.items() for receipt_type, users in room_values.items() for user_id, user_values in users.items() @@ -64,14 +64,12 @@ class ReceiptsHandler(BaseHandler): max_batch_id = None for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - res = yield self.store.insert_receipt( - room_id, receipt_type, user_id, event_ids, data + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + receipt.event_ids, + receipt.data, ) if not res: @@ -107,15 +105,15 @@ class ReceiptsHandler(BaseHandler): """Called when a client tells us a local user has read up to the given event_id in the room. """ - receipt = { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_ids": [event_id], - "data": { + receipt = ReadReceipt( + room_id=room_id, + receipt_type=receipt_type, + user_id=user_id, + event_ids=[event_id], + data={ "ts": int(self.clock.time_msec()), - } - } + }, + ) is_new = yield self._handle_new_receipts([receipt]) if not is_new: @@ -124,7 +122,7 @@ class ReceiptsHandler(BaseHandler): # Work out which remote servers should be poked and poke them. # TODO: optimise this to move some of the work to the workers. - data = receipt["data"] + data = receipt.data # XXX why does this not use state.get_current_hosts_in_room() ? users = yield self.state.get_current_user_in_room(room_id) diff --git a/synapse/types.py b/synapse/types.py index d8cb64addb..3de94b6335 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -16,6 +16,8 @@ import re import string from collections import namedtuple +import attr + from synapse.api.errors import SynapseError @@ -455,3 +457,13 @@ class ThirdPartyInstanceID( @classmethod def create(cls, appservice_id, network_id,): return cls(appservice_id=appservice_id, network_id=network_id) + + +@attr.s(slots=True) +class ReadReceipt(object): + """Information about a read-receipt""" + room_id = attr.ib() + receipt_type = attr.ib() + user_id = attr.ib() + event_ids = attr.ib() + data = attr.ib() -- cgit 1.4.1 From fdcad8eabdf20718d053255555fb27ac190613c4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 13 Mar 2019 15:55:37 +0000 Subject: Move client receipt processing to federation sender worker. This is mostly a prerequisite for #4730, but also fits with the general theme of "move everything off the master that we possibly can". --- synapse/app/federation_sender.py | 30 ++++++++++++++++++++++++++ synapse/federation/send_queue.py | 9 ++++++++ synapse/federation/transaction_queue.py | 35 +++++++++++++++++++++++++++++++ synapse/handlers/receipts.py | 37 ++++----------------------------- 4 files changed, 78 insertions(+), 33 deletions(-) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index a461442fdc..9711a7147c 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore @@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams import ReceiptsStream from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background @@ -202,6 +205,7 @@ class FederationSenderHandler(object): """ def __init__(self, hs, replication_client): self.store = hs.get_datastore() + self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() self.replication_client = replication_client @@ -234,6 +238,32 @@ class FederationSenderHandler(object): elif stream_name == "events": self.federation_sender.notify_new_events(token) + # ... and when new receipts happen + elif stream_name == ReceiptsStream.NAME: + run_as_background_process( + "process_receipts_for_federation", self._on_new_receipts, rows, + ) + + @defer.inlineCallbacks + def _on_new_receipts(self, rows): + """ + Args: + rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): + new receipts to be processed + """ + for receipt in rows: + # we only want to send on receipts for our own users + if not self._is_mine_id(receipt.user_id): + continue + receipt_info = ReadReceipt( + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + [receipt.event_id], + receipt.data, + ) + yield self.federation_sender.send_read_receipt(receipt_info) + @defer.inlineCallbacks def update_token(self, token): try: diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index b7d0b25781..bcb41da338 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -183,6 +183,15 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() + def send_read_receipt(self, receipt): + """As per TransactionQueue + + Args: + receipt (synapse.types.ReadReceipt): + """ + # nothing to do here: the replication listener will handle it. + pass + def send_presence(self, states): """As per TransactionQueue diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index e5e42c647d..288cb5045c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -290,6 +290,41 @@ class TransactionQueue(object): self._attempt_new_transaction(destination) + @defer.inlineCallbacks + def send_read_receipt(self, receipt): + """Send a RR to any other servers in the room + + Args: + receipt (synapse.types.ReadReceipt): receipt to be sent + """ + # Work out which remote servers should be poked and poke them. + domains = yield self.state.get_current_hosts_in_room(receipt.room_id) + domains = [d for d in domains if d != self.server_name] + if not domains: + return + + logger.debug("Sending receipt to: %r", domains) + + content = { + receipt.room_id: { + receipt.receipt_type: { + receipt.user_id: { + "event_ids": receipt.event_ids, + "data": receipt.data, + }, + }, + }, + } + key = (receipt.room_id, receipt.receipt_type, receipt.user_id) + + for domain in domains: + self.build_and_send_edu( + destination=domain, + edu_type="m.receipt", + content=content, + key=key, + ) + @logcontext.preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 733e7c3752..dd783ae134 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -16,9 +16,8 @@ import logging from twisted.internet import defer -from synapse.types import ReadReceipt, get_domain_from_id - -from ._base import BaseHandler +from synapse.handlers._base import BaseHandler +from synapse.types import ReadReceipt logger = logging.getLogger(__name__) @@ -87,7 +86,7 @@ class ReceiptsHandler(BaseHandler): # no new receipts defer.returnValue(False) - affected_room_ids = list(set([r["room_id"] for r in receipts])) + affected_room_ids = list(set([r.room_id for r in receipts])) self.notifier.on_new_event( "receipt_key", max_batch_id, rooms=affected_room_ids @@ -119,35 +118,7 @@ class ReceiptsHandler(BaseHandler): if not is_new: return - # Work out which remote servers should be poked and poke them. - - # TODO: optimise this to move some of the work to the workers. - data = receipt.data - - # XXX why does this not use state.get_current_hosts_in_room() ? - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.build_and_send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": [event_id], - "data": data, - } - } - }, - }, - key=(room_id, receipt_type, user_id), - ) + self.federation.send_read_receipt(receipt) @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): -- cgit 1.4.1 From 6accbd25bc4140719bbdd1b7412a349584cd8545 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 13 Mar 2019 17:24:10 +0000 Subject: changelog --- changelog.d/4852.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/4852.misc diff --git a/changelog.d/4852.misc b/changelog.d/4852.misc new file mode 100644 index 0000000000..76ab1e34e7 --- /dev/null +++ b/changelog.d/4852.misc @@ -0,0 +1 @@ + Move client read-receipt processing to federation sender worker. \ No newline at end of file -- cgit 1.4.1