diff options
-rw-r--r-- | CHANGES.rst | 9 | ||||
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 271 | ||||
-rw-r--r-- | synapse/config/workers.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 25 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 4 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 13 | ||||
-rw-r--r-- | synapse/visibility.py | 19 |
9 files changed, 307 insertions, 40 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index e7c12dd919..a415944756 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,12 @@ +Changes in synapse v0.22.1 (2017-07-06) +======================================= + +Bug fixes: + +* Fix bug where pusher pool didn't start and caused issues when + interacting with some rooms (PR #2342) + + Changes in synapse v0.22.0 (2017-07-06) ======================================= diff --git a/synapse/__init__.py b/synapse/__init__.py index 60af1cbecd..dbf22eca00 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.22.0" +__version__ = "0.22.1" diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py new file mode 100644 index 0000000000..132f18a979 --- /dev/null +++ b/synapse/app/frontend_proxy.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.crypto import context_factory +from synapse.api.errors import SynapseError +from synapse.http.servlet import ( + RestServlet, parse_json_object_from_request, +) +from synapse.rest.client.v2_alpha._base import client_v2_patterns + +from synapse import events + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + + +logger = logging.getLogger("synapse.app.frontend_proxy") + + +class KeyUploadServlet(RestServlet): + PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$", + releases=()) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(KeyUploadServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.http_client = hs.get_simple_http_client() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_POST(self, request, device_id): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. + if (requester.device_id is not None and + device_id != requester.device_id): + logger.warning("Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, device_id) + else: + device_id = requester.device_id + + if device_id is None: + raise SynapseError( + 400, + "To upload keys, you must pass device_id when authenticating" + ) + + if body: + # They're actually trying to upload something, proxy to main synapse. + result = yield self.http_client.post_json_get_json( + self.main_uri + request.uri, + body, + ) + + defer.returnValue((200, result)) + else: + # Just interested in counts. + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + defer.returnValue((200, {"one_time_key_counts": result})) + + +class FrontendProxySlavedStore( + SlavedDeviceStore, + SlavedClientIpStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + BaseSlavedStore, +): + pass + + +class FrontendProxyServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + KeyUploadServlet(self).register(resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + for address in bind_addresses: + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=address + ) + + logger.info("Synapse client reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + bind_addresses = listener["bind_addresses"] + + for address in bind_addresses: + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=address + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse frontend proxy", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.frontend_proxy" + + assert config.worker_main_http_uri is not None + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FrontendProxyServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_state_handler().start_caching() + ss.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-frontend-proxy", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ea48d931a1..99d5d8aaeb 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -32,6 +32,8 @@ class WorkerConfig(Config): self.worker_replication_port = config.get("worker_replication_port", None) self.worker_name = config.get("worker_name", self.worker_app) + self.worker_main_http_uri = config.get("worker_main_http_uri", None) + if self.worker_listeners: for listener in self.worker_listeners: bind_address = listener.pop("bind_address", None) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 694b820d85..b790a7c2ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1413,7 +1413,7 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier() and not backfilled: yield self.action_generator.handle_push_actions_for_event( event, context ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 9a96e6fe8f..803ac3e75b 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -19,7 +19,6 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent -from synapse.visibility import filter_events_for_clients_context from synapse.api.constants import EventTypes, Membership from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer @@ -92,15 +91,6 @@ class BulkPushRuleEvaluator(object): rules_by_user = yield self._get_rules_for_event(event, context) actions_by_user = {} - # None of these users can be peeking since this list of users comes - # from the set of users in the room, so we know for sure they're all - # actually in the room. - user_tuples = [(u, False) for u in rules_by_user] - - filtered_by_user = yield filter_events_for_clients_context( - self.store, user_tuples, [event], {event.event_id: context} - ) - room_members = yield self.store.get_joined_users_from_context( event, context ) @@ -110,6 +100,14 @@ class BulkPushRuleEvaluator(object): condition_cache = {} for uid, rules in rules_by_user.iteritems(): + if event.sender == uid: + continue + + if not event.is_state(): + is_ignored = yield self.store.is_ignored_by(event.sender, uid) + if is_ignored: + continue + display_name = None profile_info = room_members.get(uid) if profile_info: @@ -121,13 +119,6 @@ class BulkPushRuleEvaluator(object): if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) - filtered = filtered_by_user[uid] - if len(filtered) == 0: - continue - - if filtered[0].sender == uid: - continue - for rule in rules: if 'enabled' in rule and not rule['enabled']: continue diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 062272f8dd..d59503b905 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -244,7 +244,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): becoming full. """ if self.state == ConnectionStates.CLOSED: - logger.info("[%s] Not sending, connection closed", self.id()) + logger.debug("[%s] Not sending, connection closed", self.id()) return if do_buffer and self.state != ConnectionStates.ESTABLISHED: @@ -264,7 +264,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def _queue_command(self, cmd): """Queue the command until the connection is ready to write to again. """ - logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) + logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) self.pending_commands.append(cmd) if len(self.pending_commands) > self.max_line_buffer: diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index aa84ffc2b0..ff14e54c11 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) + + @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) + def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): + ignored_account_data = yield self.get_global_account_data_by_type_for_user( + "m.ignored_user_list", ignorer_user_id, + on_invalidate=cache_context.invalidate, + ) + if not ignored_account_data: + defer.returnValue(False) + + defer.returnValue( + ignored_user_id in ignored_account_data.get("ignored_users", {}) + ) diff --git a/synapse/visibility.py b/synapse/visibility.py index c4dd9ae2c7..5590b866ed 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -189,25 +189,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks -def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context): - user_ids = set(u[0] for u in user_tuples) - event_id_to_state = {} - for event_id, context in event_id_to_context.items(): - state = yield store.get_events([ - e_id - for key, e_id in context.current_state_ids.iteritems() - if key == (EventTypes.RoomHistoryVisibility, "") - or (key[0] == EventTypes.Member and key[1] in user_ids) - ]) - event_id_to_state[event_id] = state - - res = yield filter_events_for_clients( - store, user_tuples, events, event_id_to_state - ) - defer.returnValue(res) - - -@defer.inlineCallbacks def filter_events_for_client(store, user_id, events, is_peeking=False): """ Check which events a user is allowed to see |