diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
+from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
"""
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
+ self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
elif stream_name == "events":
self.federation_sender.notify_new_events(token)
+ # ... and when new receipts happen
+ elif stream_name == ReceiptsStream.NAME:
+ run_as_background_process(
+ "process_receipts_for_federation", self._on_new_receipts, rows,
+ )
+
+ @defer.inlineCallbacks
+ def _on_new_receipts(self, rows):
+ """
+ Args:
+ rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+ new receipts to be processed
+ """
+ for receipt in rows:
+ # we only want to send on receipts for our own users
+ if not self._is_mine_id(receipt.user_id):
+ continue
+ receipt_info = ReadReceipt(
+ receipt.room_id,
+ receipt.receipt_type,
+ receipt.user_id,
+ [receipt.event_id],
+ receipt.data,
+ )
+ yield self.federation_sender.send_read_receipt(receipt_info)
+
@defer.inlineCallbacks
def update_token(self, token):
try:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b7d0b25781..bcb41da338 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -183,6 +183,15 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
+ def send_read_receipt(self, receipt):
+ """As per TransactionQueue
+
+ Args:
+ receipt (synapse.types.ReadReceipt):
+ """
+ # nothing to do here: the replication listener will handle it.
+ pass
+
def send_presence(self, states):
"""As per TransactionQueue
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index e5e42c647d..288cb5045c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -290,6 +290,41 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
+ @defer.inlineCallbacks
+ def send_read_receipt(self, receipt):
+ """Send a RR to any other servers in the room
+
+ Args:
+ receipt (synapse.types.ReadReceipt): receipt to be sent
+ """
+ # Work out which remote servers should be poked and poke them.
+ domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
+ domains = [d for d in domains if d != self.server_name]
+ if not domains:
+ return
+
+ logger.debug("Sending receipt to: %r", domains)
+
+ content = {
+ receipt.room_id: {
+ receipt.receipt_type: {
+ receipt.user_id: {
+ "event_ids": receipt.event_ids,
+ "data": receipt.data,
+ },
+ },
+ },
+ }
+ key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
+
+ for domain in domains:
+ self.build_and_send_edu(
+ destination=domain,
+ edu_type="m.receipt",
+ content=content,
+ key=key,
+ )
+
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 733e7c3752..dd783ae134 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,9 +16,8 @@ import logging
from twisted.internet import defer
-from synapse.types import ReadReceipt, get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
logger = logging.getLogger(__name__)
@@ -87,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts
defer.returnValue(False)
- affected_room_ids = list(set([r["room_id"] for r in receipts]))
+ affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
@@ -119,35 +118,7 @@ class ReceiptsHandler(BaseHandler):
if not is_new:
return
- # Work out which remote servers should be poked and poke them.
-
- # TODO: optimise this to move some of the work to the workers.
- data = receipt.data
-
- # XXX why does this not use state.get_current_hosts_in_room() ?
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.build_and_send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": [event_id],
- "data": data,
- }
- }
- },
- },
- key=(room_id, receipt_type, user_id),
- )
+ self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
|