diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/auth.py | 2 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 267 | ||||
-rw-r--r-- | synapse/config/workers.py | 2 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 24 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 25 | ||||
-rw-r--r-- | synapse/push/push_rule_evaluator.py | 4 | ||||
-rw-r--r-- | synapse/replication/slave/storage/client_ips.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 14 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 13 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 8 | ||||
-rw-r--r-- | synapse/visibility.py | 19 |
11 files changed, 325 insertions, 56 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index f8266d1c81..e3da45b416 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -209,7 +209,7 @@ class Auth(object): )[0] if user and access_token and ip_addr: self.store.insert_client_ip( - user=user, + user_id=user.to_string(), access_token=access_token, ip=ip_addr, user_agent=user_agent, diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py new file mode 100644 index 0000000000..c8fa7854ad --- /dev/null +++ b/synapse/app/frontend_proxy.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.crypto import context_factory +from synapse.api.errors import SynapseError +from synapse.http.servlet import ( + RestServlet, parse_json_object_from_request, +) +from synapse.rest.client.v2_alpha._base import client_v2_patterns + +from synapse import events + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + + +logger = logging.getLogger("synapse.app.frontend_proxy") + + +class KeyUploadServlet(RestServlet): + PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$", + releases=()) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(KeyUploadServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.http_client = hs.get_simple_http_client() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_POST(self, request, device_id): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. + if (requester.device_id is not None and + device_id != requester.device_id): + logger.warning("Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, device_id) + else: + device_id = requester.device_id + + if device_id is None: + raise SynapseError( + 400, + "To upload keys, you must pass device_id when authenticating" + ) + + if body: + # They're actually trying to upload something, proxy to main synapse. + result = yield self.http_client.post_json_get_json( + self.main_uri + request.uri, + body, + ) + + defer.returnValue((200, result)) + else: + # Just interested in counts. + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + defer.returnValue((200, {"one_time_key_counts": result})) + + +class FrontendProxySlavedStore( + SlavedDeviceStore, + SlavedClientIpStore, + BaseSlavedStore, +): + pass + + +class FrontendProxyServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + KeyUploadServlet(self).register(resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + for address in bind_addresses: + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=address + ) + + logger.info("Synapse client reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + bind_addresses = listener["bind_addresses"] + + for address in bind_addresses: + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=address + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse frontend proxy", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.frontend_proxy" + + assert config.worker_main_http_uri is not None + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FrontendProxyServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_state_handler().start_caching() + ss.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-frontend-proxy", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ea48d931a1..99d5d8aaeb 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -32,6 +32,8 @@ class WorkerConfig(Config): self.worker_replication_port = config.get("worker_replication_port", None) self.worker_name = config.get("worker_name", self.worker_app) + self.worker_main_http_uri = config.get("worker_main_http_uri", None) + if self.worker_listeners: for listener in self.worker_listeners: bind_address = listener.pop("bind_address", None) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3d676e7d8b..a78f01e442 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -153,12 +153,10 @@ class Authenticator(object): class BaseFederationServlet(object): REQUIRE_AUTH = True - def __init__(self, handler, authenticator, ratelimiter, server_name, - room_list_handler): + def __init__(self, handler, authenticator, ratelimiter, server_name): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter - self.room_list_handler = room_list_handler def _wrap(self, func): authenticator = self.authenticator @@ -590,7 +588,7 @@ class PublicRoomList(BaseFederationServlet): else: network_tuple = ThirdPartyInstanceID(None, None) - data = yield self.room_list_handler.get_local_public_room_list( + data = yield self.handler.get_local_public_room_list( limit, since_token, network_tuple=network_tuple ) @@ -611,7 +609,7 @@ class FederationVersionServlet(BaseFederationServlet): })) -SERVLET_CLASSES = ( +FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, FederationEventServlet, @@ -634,17 +632,27 @@ SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, OpenIdUserInfo, - PublicRoomList, FederationVersionServlet, ) +ROOM_LIST_CLASSES = ( + PublicRoomList, +) + def register_servlets(hs, resource, authenticator, ratelimiter): - for servletclass in SERVLET_CLASSES: + for servletclass in FEDERATION_SERVLET_CLASSES: servletclass( handler=hs.get_replication_layer(), authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, - room_list_handler=hs.get_room_list_handler(), + ).register(resource) + + for servletclass in ROOM_LIST_CLASSES: + servletclass( + handler=hs.get_room_list_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, ).register(resource) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 9a96e6fe8f..803ac3e75b 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -19,7 +19,6 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent -from synapse.visibility import filter_events_for_clients_context from synapse.api.constants import EventTypes, Membership from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer @@ -92,15 +91,6 @@ class BulkPushRuleEvaluator(object): rules_by_user = yield self._get_rules_for_event(event, context) actions_by_user = {} - # None of these users can be peeking since this list of users comes - # from the set of users in the room, so we know for sure they're all - # actually in the room. - user_tuples = [(u, False) for u in rules_by_user] - - filtered_by_user = yield filter_events_for_clients_context( - self.store, user_tuples, [event], {event.event_id: context} - ) - room_members = yield self.store.get_joined_users_from_context( event, context ) @@ -110,6 +100,14 @@ class BulkPushRuleEvaluator(object): condition_cache = {} for uid, rules in rules_by_user.iteritems(): + if event.sender == uid: + continue + + if not event.is_state(): + is_ignored = yield self.store.is_ignored_by(event.sender, uid) + if is_ignored: + continue + display_name = None profile_info = room_members.get(uid) if profile_info: @@ -121,13 +119,6 @@ class BulkPushRuleEvaluator(object): if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) - filtered = filtered_by_user[uid] - if len(filtered) == 0: - continue - - if filtered[0].sender == uid: - continue - for rule in rules: if 'enabled' in rule and not rule['enabled']: continue diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 4d88046579..172c27c137 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -200,7 +200,9 @@ def _glob_to_re(glob, word_boundary): return re.compile(r, flags=re.IGNORECASE) -def _flatten_dict(d, prefix=[], result={}): +def _flatten_dict(d, prefix=[], result=None): + if result is None: + result = {} for key, value in d.items(): if isinstance(value, basestring): result[".".join(prefix + [key])] = value.lower() diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 65250285e8..352c9a2aa8 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -29,9 +29,8 @@ class SlavedClientIpStore(BaseSlavedStore): max_entries=50000 * CACHE_SIZE_FACTOR, ) - def insert_client_ip(self, user, access_token, ip, user_agent, device_id): + def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) - user_id = user.to_string() key = (user_id, access_token, ip) try: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index a009214e43..171227cce2 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -323,14 +323,18 @@ class UserIpCommand(Command): @classmethod def from_line(cls, line): - user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5) + user_id, jsn = line.split(" ", 1) - return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen)) + access_token, ip, user_agent, device_id, last_seen = json.loads(jsn) + + return cls( + user_id, access_token, ip, user_agent, device_id, last_seen + ) def to_line(self): - return " ".join(( - self.user_id, self.access_token, self.ip, self.device_id, - str(self.last_seen), self.user_agent, + return self.user_id + " " + json.dumps(( + self.access_token, self.ip, self.user_agent, self.device_id, + self.last_seen, )) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index aa84ffc2b0..ff14e54c11 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) + + @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) + def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): + ignored_account_data = yield self.get_global_account_data_by_type_for_user( + "m.ignored_user_list", ignorer_user_id, + on_invalidate=cache_context.invalidate, + ) + if not ignored_account_data: + defer.returnValue(False) + + defer.returnValue( + ignored_user_id in ignored_account_data.get("ignored_users", {}) + ) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index fc468ea185..3c95e90eca 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -56,9 +56,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) - def insert_client_ip(self, user, access_token, ip, user_agent, device_id): - now = int(self._clock.time_msec()) - key = (user.to_string(), access_token, ip) + def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, + now=None): + if not now: + now = int(self._clock.time_msec()) + key = (user_id, access_token, ip) try: last_seen = self.client_ip_last_seen.get(key) diff --git a/synapse/visibility.py b/synapse/visibility.py index c4dd9ae2c7..5590b866ed 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -189,25 +189,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks -def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context): - user_ids = set(u[0] for u in user_tuples) - event_id_to_state = {} - for event_id, context in event_id_to_context.items(): - state = yield store.get_events([ - e_id - for key, e_id in context.current_state_ids.iteritems() - if key == (EventTypes.RoomHistoryVisibility, "") - or (key[0] == EventTypes.Member and key[1] in user_ids) - ]) - event_id_to_state[event_id] = state - - res = yield filter_events_for_clients( - store, user_tuples, events, event_id_to_state - ) - defer.returnValue(res) - - -@defer.inlineCallbacks def filter_events_for_client(store, user_id, events, is_peeking=False): """ Check which events a user is allowed to see |