diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 32e8b8a3f5..d4c6c4c8e2 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -63,12 +63,13 @@ def start_worker_reactor(appname, config):
start_reactor(
appname,
- config.soft_file_limit,
- config.gc_thresholds,
- config.worker_pid_file,
- config.worker_daemonize,
- config.worker_cpu_affinity,
- logger,
+ soft_file_limit=config.soft_file_limit,
+ gc_thresholds=config.gc_thresholds,
+ pid_file=config.worker_pid_file,
+ daemonize=config.worker_daemonize,
+ cpu_affinity=config.worker_cpu_affinity,
+ print_pidfile=config.print_pidfile,
+ logger=logger,
)
@@ -79,6 +80,7 @@ def start_reactor(
pid_file,
daemonize,
cpu_affinity,
+ print_pidfile,
logger,
):
""" Run the reactor in the main process
@@ -93,6 +95,7 @@ def start_reactor(
pid_file (str): name of pid file to write to if daemonize is True
daemonize (bool): true to run the reactor in a background process
cpu_affinity (int|None): cpu affinity mask
+ print_pidfile (bool): whether to print the pid file, if daemonize is True
logger (logging.Logger): logger instance to pass to Daemonize
"""
@@ -124,6 +127,9 @@ def start_reactor(
reactor.run()
if daemonize:
+ if print_pidfile:
+ print(pid_file)
+
daemon = Daemonize(
app=appname,
pid=pid_file,
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 043b48f8f3..beaea64a61 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -33,9 +33,13 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
@@ -48,6 +52,8 @@ from synapse.rest.client.v1.room import (
RoomMemberListRestServlet,
RoomStateRestServlet,
)
+from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
+from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -60,6 +66,10 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
+ SlavedDeviceInboxStore,
+ SlavedDeviceStore,
+ SlavedReceiptsStore,
+ SlavedPushRuleStore,
SlavedAccountDataStore,
SlavedEventStore,
SlavedKeyStore,
@@ -96,6 +106,9 @@ class ClientReaderServer(HomeServer):
RoomEventContextServlet(self).register(resource)
RegisterRestServlet(self).register(resource)
LoginRestServlet(self).register(resource)
+ ThreepidRestServlet(self).register(resource)
+ KeyQueryServlet(self).register(resource)
+ KeyChangesServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index b116c17669..7da79dc827 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
-from synapse.api.urls import FEDERATION_PREFIX
+from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer):
),
})
+ if name in ["keys", "federation"]:
+ resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
+
root_resource = create_resource_tree(resources, NoResource())
_base.listen_tcp(
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
+from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
"""
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
+ self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
elif stream_name == "events":
self.federation_sender.notify_new_events(token)
+ # ... and when new receipts happen
+ elif stream_name == ReceiptsStream.NAME:
+ run_as_background_process(
+ "process_receipts_for_federation", self._on_new_receipts, rows,
+ )
+
+ @defer.inlineCallbacks
+ def _on_new_receipts(self, rows):
+ """
+ Args:
+ rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+ new receipts to be processed
+ """
+ for receipt in rows:
+ # we only want to send on receipts for our own users
+ if not self._is_mine_id(receipt.user_id):
+ continue
+ receipt_info = ReadReceipt(
+ receipt.room_id,
+ receipt.receipt_type,
+ receipt.user_id,
+ [receipt.event_id],
+ receipt.data,
+ )
+ yield self.federation_sender.send_read_receipt(receipt_info)
+
@defer.inlineCallbacks
def update_token(self, token):
try:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e8b6cc3114..869c028d1f 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -376,6 +376,7 @@ def setup(config_options):
logger.info("Database prepared in %s.", config.database_config['name'])
hs.setup()
+ hs.setup_master()
@defer.inlineCallbacks
def do_acme():
@@ -636,17 +637,15 @@ def run(hs):
# be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home)
- if hs.config.daemonize and hs.config.print_pidfile:
- print(hs.config.pid_file)
-
_base.start_reactor(
"synapse-homeserver",
- hs.config.soft_file_limit,
- hs.config.gc_thresholds,
- hs.config.pid_file,
- hs.config.daemonize,
- hs.config.cpu_affinity,
- logger,
+ soft_file_limit=hs.config.soft_file_limit,
+ gc_thresholds=hs.config.gc_thresholds,
+ pid_file=hs.config.pid_file,
+ daemonize=hs.config.daemonize,
+ cpu_affinity=hs.config.cpu_affinity,
+ print_pidfile=hs.config.print_pidfile,
+ logger=logger,
)
|