summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/_base.py18
-rw-r--r--synapse/app/client_reader.py13
-rw-r--r--synapse/app/federation_reader.py6
-rw-r--r--synapse/app/federation_sender.py30
-rwxr-xr-xsynapse/app/homeserver.py17
5 files changed, 68 insertions, 16 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py

index 32e8b8a3f5..d4c6c4c8e2 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py
@@ -63,12 +63,13 @@ def start_worker_reactor(appname, config): start_reactor( appname, - config.soft_file_limit, - config.gc_thresholds, - config.worker_pid_file, - config.worker_daemonize, - config.worker_cpu_affinity, - logger, + soft_file_limit=config.soft_file_limit, + gc_thresholds=config.gc_thresholds, + pid_file=config.worker_pid_file, + daemonize=config.worker_daemonize, + cpu_affinity=config.worker_cpu_affinity, + print_pidfile=config.print_pidfile, + logger=logger, ) @@ -79,6 +80,7 @@ def start_reactor( pid_file, daemonize, cpu_affinity, + print_pidfile, logger, ): """ Run the reactor in the main process @@ -93,6 +95,7 @@ def start_reactor( pid_file (str): name of pid file to write to if daemonize is True daemonize (bool): true to run the reactor in a background process cpu_affinity (int|None): cpu affinity mask + print_pidfile (bool): whether to print the pid file, if daemonize is True logger (logging.Logger): logger instance to pass to Daemonize """ @@ -124,6 +127,9 @@ def start_reactor( reactor.run() if daemonize: + if print_pidfile: + print(pid_file) + daemon = Daemonize( app=appname, pid=pid_file, diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 043b48f8f3..beaea64a61 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py
@@ -33,9 +33,13 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore @@ -48,6 +52,8 @@ from synapse.rest.client.v1.room import ( RoomMemberListRestServlet, RoomStateRestServlet, ) +from synapse.rest.client.v2_alpha.account import ThreepidRestServlet +from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -60,6 +66,10 @@ logger = logging.getLogger("synapse.app.client_reader") class ClientReaderSlavedStore( + SlavedDeviceInboxStore, + SlavedDeviceStore, + SlavedReceiptsStore, + SlavedPushRuleStore, SlavedAccountDataStore, SlavedEventStore, SlavedKeyStore, @@ -96,6 +106,9 @@ class ClientReaderServer(HomeServer): RoomEventContextServlet(self).register(resource) RegisterRestServlet(self).register(resource) LoginRestServlet(self).register(resource) + ThreepidRestServlet(self).register(resource) + KeyQueryServlet(self).register(resource) + KeyChangesServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index b116c17669..7da79dc827 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource import synapse from synapse import events -from synapse.api.urls import FEDERATION_PREFIX +from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig @@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer): ), }) + if name in ["keys", "federation"]: + resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore @@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams import ReceiptsStream from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background @@ -202,6 +205,7 @@ class FederationSenderHandler(object): """ def __init__(self, hs, replication_client): self.store = hs.get_datastore() + self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() self.replication_client = replication_client @@ -234,6 +238,32 @@ class FederationSenderHandler(object): elif stream_name == "events": self.federation_sender.notify_new_events(token) + # ... and when new receipts happen + elif stream_name == ReceiptsStream.NAME: + run_as_background_process( + "process_receipts_for_federation", self._on_new_receipts, rows, + ) + + @defer.inlineCallbacks + def _on_new_receipts(self, rows): + """ + Args: + rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): + new receipts to be processed + """ + for receipt in rows: + # we only want to send on receipts for our own users + if not self._is_mine_id(receipt.user_id): + continue + receipt_info = ReadReceipt( + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + [receipt.event_id], + receipt.data, + ) + yield self.federation_sender.send_read_receipt(receipt_info) + @defer.inlineCallbacks def update_token(self, token): try: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e8b6cc3114..869c028d1f 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py
@@ -376,6 +376,7 @@ def setup(config_options): logger.info("Database prepared in %s.", config.database_config['name']) hs.setup() + hs.setup_master() @defer.inlineCallbacks def do_acme(): @@ -636,17 +637,15 @@ def run(hs): # be quite busy the first few minutes clock.call_later(5 * 60, start_phone_stats_home) - if hs.config.daemonize and hs.config.print_pidfile: - print(hs.config.pid_file) - _base.start_reactor( "synapse-homeserver", - hs.config.soft_file_limit, - hs.config.gc_thresholds, - hs.config.pid_file, - hs.config.daemonize, - hs.config.cpu_affinity, - logger, + soft_file_limit=hs.config.soft_file_limit, + gc_thresholds=hs.config.gc_thresholds, + pid_file=hs.config.pid_file, + daemonize=hs.config.daemonize, + cpu_affinity=hs.config.cpu_affinity, + print_pidfile=hs.config.print_pidfile, + logger=logger, )