diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/appservice.py | 10 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 12 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 10 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 13 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 13 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 12 | ||||
-rw-r--r-- | synapse/app/pusher.py | 11 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 52 |
8 files changed, 103 insertions, 30 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 1900930053..a6f1e7594e 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -157,7 +157,7 @@ def start(config_options): assert config.worker_app == "synapse.app.appservice" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -187,7 +187,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 4d081eccd1..e4ea3ab933 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -29,13 +29,14 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.transactions import TransactionStore from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -63,6 +64,7 @@ class ClientReaderSlavedStore( DirectoryStore, SlavedApplicationServiceStore, SlavedRegistrationStore, + TransactionStore, BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different ): @@ -171,7 +173,7 @@ def start(config_options): assert config.worker_app == "synapse.app.client_reader" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -193,7 +195,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 90a4816753..e52b0f240d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -31,7 +31,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -162,7 +162,7 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_reader" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -184,7 +184,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index ec06620efb..76c4cc54d1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -30,11 +30,12 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -56,7 +57,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, + SlavedRegistrationStore, SlavedDeviceStore, ): pass @@ -159,7 +160,7 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_sender" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -192,7 +193,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e0b87468fe..2cdd2d39ff 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,6 +20,8 @@ import gc import logging import os import sys + +import synapse.config.logger from synapse.config._base import ConfigError from synapse.python_dependencies import ( @@ -50,7 +52,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX @@ -286,7 +288,7 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - config.setup_logging() + synapse.config.logger.setup_logging(config, use_worker_options=False) # check any extra requirements we have now we have a config check_requirements(config) @@ -454,7 +456,12 @@ def run(hs): def in_thread(): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) - with LoggingContext("run"): + + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index ef17b158a5..1444e69a42 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -24,6 +24,7 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.transactions import TransactionStore from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.server import HomeServer @@ -32,7 +33,7 @@ from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -59,6 +60,7 @@ logger = logging.getLogger("synapse.app.media_repository") class MediaRepositorySlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, + TransactionStore, BaseSlavedStore, MediaRepositoryStore, ClientIpStore, @@ -168,7 +170,7 @@ def start(config_options): assert config.worker_app == "synapse.app.media_repository" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -190,7 +192,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 073f2c2489..ab682e52ec 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -245,7 +246,7 @@ def start(config_options): assert config.worker_app == "synapse.app.pusher" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -275,7 +276,11 @@ def start(config_options): ps.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4dfc2dc648..34e34e5580 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite from synapse.http.server import JsonResource @@ -39,6 +38,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.room import RoomStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore @@ -47,7 +47,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn, \ + PreserveLoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -77,6 +78,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, SlavedDeviceInboxStore, + SlavedDeviceStore, RoomStore, BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different @@ -85,6 +87,10 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) + did_forget = ( + RoomMemberStore.__dict__["did_forget"] + ) + # XXX: This is a bit broken because we don't persist the accepted list in a # way that can be replicated. This means that we don't have a way to # invalidate the cache correctly. @@ -380,15 +386,40 @@ class SynchrotronServer(HomeServer): stream_key, position, users=users, rooms=rooms ) + @defer.inlineCallbacks + def notify_device_list_update(result): + stream = result.get("device_lists") + if not stream: + return + + position_index = stream["field_names"].index("position") + user_index = stream["field_names"].index("user_id") + + for row in stream["rows"]: + position = row[position_index] + user_id = row[user_index] + + room_ids = yield store.get_rooms_for_user(user_id) + + notifier.on_new_event( + "device_list_key", position, rooms=room_ids, + ) + + @defer.inlineCallbacks def notify(result): stream = result.get("events") if stream: max_position = stream["position"] + + event_map = yield store.get_events([row[1] for row in stream["rows"]]) + for row in stream["rows"]: position = row[0] - internal = json.loads(row[1]) - event_json = json.loads(row[2]) - event = FrozenEvent(event_json, internal_metadata_dict=internal) + event_id = row[1] + event = event_map.get(event_id, None) + if not event: + continue + extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) @@ -417,6 +448,7 @@ class SynchrotronServer(HomeServer): notify_from_stream( result, "to_device", "to_device_key", user="user_id" ) + yield notify_device_list_update(result) while True: try: @@ -427,7 +459,7 @@ class SynchrotronServer(HomeServer): yield store.process_replication(result) typing_handler.process_replication(result) yield presence_handler.process_replication(result) - notify(result) + yield notify(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(5) @@ -450,7 +482,7 @@ def start(config_options): assert config.worker_app == "synapse.app.synchrotron" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -469,7 +501,11 @@ def start(config_options): ss.start_listening(config.worker_listeners) def run(): - with LoggingContext("run"): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: |