diff options
author | Krombel <krombel@krombel.de> | 2017-06-21 14:48:21 +0200 |
---|---|---|
committer | Krombel <krombel@krombel.de> | 2017-06-21 14:48:21 +0200 |
commit | 4202fba82a255eb52783a91d873b91aa8ef2191c (patch) | |
tree | 73975e960b5a80f7a466cbf62810639662093d4a /synapse/app/user_dir.py | |
parent | replaced json.dumps with encode_canonical_json (diff) | |
parent | Merge pull request #2292 from matrix-org/erikj/quarantine_media (diff) | |
download | synapse-4202fba82a255eb52783a91d873b91aa8ef2191c.tar.xz |
Merge branch 'develop' into avoid_duplicate_filters
Diffstat (limited to 'synapse/app/user_dir.py')
-rw-r--r-- | synapse/app/user_dir.py | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py new file mode 100644 index 0000000000..9d8edaa8e3 --- /dev/null +++ b/synapse/app/user_dir.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.crypto import context_factory +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha import user_directory +from synapse.storage.engines import create_engine +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from synapse import events + +from twisted.internet import reactor +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.user_dir") + + +class UserDirectorySlaveStore( + SlavedEventStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + UserDirectoryStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStore because the constructor is different +): + def __init__(self, db_conn, hs): + super(UserDirectorySlaveStore, self).__init__(db_conn, hs) + + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + + self._current_state_delta_pos = events_max + + def stream_positions(self): + result = super(UserDirectorySlaveStore, self).stream_positions() + result["current_state_deltas"] = self._current_state_delta_pos + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "current_state_deltas": + self._current_state_delta_pos = token + for row in rows: + self._curr_state_delta_stream_cache.entity_has_changed( + row.room_id, token + ) + return super(UserDirectorySlaveStore, self).process_replication_rows( + stream_name, token, rows + ) + + +class UserDirectoryServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + user_directory.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + for address in bind_addresses: + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=address + ) + + logger.info("Synapse user_dir now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + bind_addresses = listener["bind_addresses"] + + for address in bind_addresses: + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=address + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return UserDirectoryReplicationHandler(self) + + +class UserDirectoryReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) + self.user_directory = hs.get_user_directory_handler() + + def on_rdata(self, stream_name, token, rows): + super(UserDirectoryReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + if stream_name == "current_state_deltas": + preserve_fn(self.user_directory.notify_new_event)() + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse user directory", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.user_dir" + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + if config.update_user_directory: + sys.stderr.write( + "\nThe update_user_directory must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``update_user_directory: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.update_user_directory = True + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ps = UserDirectoryServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-user-dir", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) |