diff options
Diffstat (limited to 'synapse')
76 files changed, 2411 insertions, 1338 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 3cde33c0d7..5c0f2f83aa 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -17,4 +17,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.32.2" +__version__ = "0.33.0" diff --git a/synapse/api/auth.py b/synapse/api/auth.py index bc629832d9..073229b4c4 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -65,8 +65,9 @@ class Auth(object): @defer.inlineCallbacks def check_from_context(self, event, context, do_sig_check=True): + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -544,7 +545,8 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): - auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids) + prev_state_ids = yield context.get_prev_state_ids(self.store) + auth_ids = yield self.compute_auth_events(builder, prev_state_ids) auth_events_entries = yield self.store.add_event_hashes( auth_ids @@ -737,3 +739,37 @@ class Auth(object): ) return query_params[0] + + @defer.inlineCallbacks + def check_in_room_or_world_readable(self, room_id, user_id): + """Checks that the user is or was in the room or the room is world + readable. If it isn't then an exception is raised. + + Returns: + Deferred[tuple[str, str|None]]: Resolves to the current membership of + the user in the room and the membership event ID of the user. If + the user is not in the room and never has been, then + `(Membership.JOIN, None)` is returned. + """ + + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + except AuthError: + visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 25346baa87..186831e118 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -113,7 +113,13 @@ ROOM_EVENT_FILTER_SCHEMA = { }, "contains_url": { "type": "boolean" - } + }, + "lazy_load_members": { + "type": "boolean" + }, + "include_redundant_members": { + "type": "boolean" + }, } } @@ -261,6 +267,12 @@ class FilterCollection(object): def ephemeral_limit(self): return self._room_ephemeral_filter.limit() + def lazy_load_members(self): + return self._room_state_filter.lazy_load_members() + + def include_redundant_members(self): + return self._room_state_filter.include_redundant_members() + def filter_presence(self, events): return self._presence_filter.filter(events) @@ -417,6 +429,12 @@ class Filter(object): def limit(self): return self.filter_json.get("limit", 10) + def lazy_load_members(self): + return self.filter_json.get("lazy_load_members", False) + + def include_redundant_members(self): + return self.filter_json.get("include_redundant_members", False) + def _matches_wildcard(actual_value, filter_value): if filter_value.endswith("*"): diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index b0ea26dcb4..e2c91123db 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.directory import DirectoryStore @@ -40,7 +41,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.room import PublicRoomListRestServlet +from synapse.rest.client.v1.room import ( + JoinedRoomMemberListRestServlet, + PublicRoomListRestServlet, + RoomEventContextServlet, + RoomMemberListRestServlet, + RoomStateRestServlet, +) from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -52,6 +59,7 @@ logger = logging.getLogger("synapse.app.client_reader") class ClientReaderSlavedStore( + SlavedAccountDataStore, SlavedEventStore, SlavedKeyStore, RoomStore, @@ -82,7 +90,13 @@ class ClientReaderServer(HomeServer): resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) + PublicRoomListRestServlet(self).register(resource) + RoomMemberListRestServlet(self).register(resource) + JoinedRoomMemberListRestServlet(self).register(resource) + RoomStateRestServlet(self).register(resource) + RoomEventContextServlet(self).register(resource) + resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 14e6dca522..57b815d777 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,6 +18,8 @@ import logging import os import sys +from six import iteritems + from twisted.application import service from twisted.internet import defer, reactor from twisted.web.resource import EncodingResourceWrapper, NoResource @@ -47,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.module_api import ModuleApi from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements @@ -425,6 +428,9 @@ def run(hs): # currently either 0 or 1 stats_process = [] + def start_phone_stats_home(): + return run_as_background_process("phone_stats_home", phone_stats_home) + @defer.inlineCallbacks def phone_stats_home(): logger.info("Gathering stats for reporting") @@ -442,7 +448,7 @@ def run(hs): stats["total_nonbridged_users"] = total_nonbridged_users daily_user_type_results = yield hs.get_datastore().count_daily_user_type() - for name, count in daily_user_type_results.iteritems(): + for name, count in iteritems(daily_user_type_results): stats["daily_user_type_" + name] = count room_count = yield hs.get_datastore().get_room_count() @@ -453,7 +459,7 @@ def run(hs): stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() - for name, count in r30_results.iteritems(): + for name, count in iteritems(r30_results): stats["r30_users_" + name] = count daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() @@ -496,7 +502,10 @@ def run(hs): ) def generate_user_daily_visit_stats(): - hs.get_datastore().generate_user_daily_visits() + return run_as_background_process( + "generate_user_daily_visits", + hs.get_datastore().generate_user_daily_visits, + ) # Rather than update on per session basis, batch up the requests. # If you increase the loop period, the accuracy of user_daily_visits @@ -505,7 +514,7 @@ def run(hs): if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) + clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process @@ -513,7 +522,7 @@ def run(hs): # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes - clock.call_later(5 * 60, phone_stats_home) + clock.call_later(5 * 60, start_phone_stats_home) if hs.config.daemonize and hs.config.print_pidfile: print (hs.config.pid_file) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 26b9ec85f2..e201f18efd 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState -from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole @@ -81,9 +80,7 @@ class SynchrotronSlavedStore( RoomStore, BaseSlavedStore, ): - did_forget = ( - RoomMemberStore.__dict__["did_forget"] - ) + pass UPDATE_SYNCING_USERS_MS = 10 * 1000 diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 68acc15a9a..d658f967ba 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -25,6 +25,8 @@ import subprocess import sys import time +from six import iteritems + import yaml SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"] @@ -173,7 +175,7 @@ def main(): os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) cache_factors = config.get("synctl_cache_factors", {}) - for cache_name, factor in cache_factors.iteritems(): + for cache_name, factor in iteritems(cache_factors): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) worker_configfiles = [] diff --git a/synapse/config/voip.py b/synapse/config/voip.py index 3a4e16fa96..d07bd24ffd 100644 --- a/synapse/config/voip.py +++ b/synapse/config/voip.py @@ -30,10 +30,10 @@ class VoipConfig(Config): ## Turn ## # The public URIs of the TURN server to give to clients - turn_uris: [] + #turn_uris: [] # The shared secret used to compute passwords for the TURN server - turn_shared_secret: "YOUR_SHARED_SECRET" + #turn_shared_secret: "YOUR_SHARED_SECRET" # The Username and password if the TURN server needs them and # does not use a token diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index bcd9bb5946..368b5f6ae4 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,22 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import iteritems + from frozendict import frozendict from twisted.internet import defer +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + class EventContext(object): """ Attributes: - current_state_ids (dict[(str, str), str]): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]): - The current state map excluding the current event. - (type, state_key) -> event_id - state_group (int|None): state group id, if the state has been stored as a state group. This is usually only None if e.g. the event is an outlier. @@ -45,38 +41,77 @@ class EventContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? + + _current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet + + _event_type (str): The type of the event the context is associated with. + Only set when state has not been fetched yet. + + _event_state_key (str|None): The state_key of the event the context is + associated with. Only set when state has not been fetched yet. + + _prev_state_id (str|None): If the event associated with the context is + a state event, then `_prev_state_id` is the event_id of the state + that was replaced. + Only set when state has not been fetched yet. """ __slots__ = [ - "current_state_ids", - "prev_state_ids", "state_group", "rejected", "prev_group", "delta_ids", "prev_state_events", "app_service", + "_current_state_ids", + "_prev_state_ids", + "_prev_state_id", + "_event_type", + "_event_state_key", + "_fetching_state_deferred", ] def __init__(self): + self.prev_state_events = [] + self.rejected = False + self.app_service = None + + @staticmethod + def with_state(state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): + context = EventContext() + # The current state including the current event - self.current_state_ids = None + context._current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = None - self.state_group = None + context._prev_state_ids = prev_state_ids + context.state_group = state_group - self.rejected = False + context._prev_state_id = None + context._event_type = None + context._event_state_key = None + context._fetching_state_deferred = defer.succeed(None) # A previously persisted state group and a delta between that # and this state. - self.prev_group = None - self.delta_ids = None + context.prev_group = prev_group + context.delta_ids = delta_ids - self.prev_state_events = None - - self.app_service = None + return context - def serialize(self, event): + @defer.inlineCallbacks + def serialize(self, event, store): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` @@ -92,11 +127,12 @@ class EventContext(object): # the prev_state_ids, so if we're a state event we include the event # id that we replaced in the state. if event.is_state(): - prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield self.get_prev_state_ids(store) + prev_state_id = prev_state_ids.get((event.type, event.state_key)) else: prev_state_id = None - return { + defer.returnValue({ "prev_state_id": prev_state_id, "event_type": event.type, "event_state_key": event.state_key if event.is_state() else None, @@ -106,10 +142,9 @@ class EventContext(object): "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None - } + }) @staticmethod - @defer.inlineCallbacks def deserialize(store, input): """Converts a dict that was produced by `serialize` back into a EventContext. @@ -122,32 +157,115 @@ class EventContext(object): EventContext """ context = EventContext() + + # We use the state_group and prev_state_id stuff to pull the + # current_state_ids out of the DB and construct prev_state_ids. + context._prev_state_id = input["prev_state_id"] + context._event_type = input["event_type"] + context._event_state_key = input["event_state_key"] + + context._current_state_ids = None + context._prev_state_ids = None + context._fetching_state_deferred = None + context.state_group = input["state_group"] - context.rejected = input["rejected"] context.prev_group = input["prev_group"] context.delta_ids = _decode_state_dict(input["delta_ids"]) + + context.rejected = input["rejected"] context.prev_state_events = input["prev_state_events"] - # We use the state_group and prev_state_id stuff to pull the - # current_state_ids out of the DB and construct prev_state_ids. - prev_state_id = input["prev_state_id"] - event_type = input["event_type"] - event_state_key = input["event_state_key"] + app_service_id = input["app_service_id"] + if app_service_id: + context.app_service = store.get_app_service_by_id(app_service_id) + + return context + + @defer.inlineCallbacks + def get_current_state_ids(self, store): + """Gets the current state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._current_state_ids) + + @defer.inlineCallbacks + def get_prev_state_ids(self, store): + """Gets the prev state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) - context.current_state_ids = yield store.get_state_ids_for_group( - context.state_group, + defer.returnValue(self._prev_state_ids) + + def get_cached_current_state_ids(self): + """Gets the current state IDs if we have them already cached. + + Returns: + dict[(str, str), str]|None: Returns None if we haven't cached the + state or if state_group is None, which happens when the associated + event is an outlier. + """ + + return self._current_state_ids + + @defer.inlineCallbacks + def _fill_out_state(self, store): + """Called to populate the _current_state_ids and _prev_state_ids + attributes by loading from the database. + """ + if self.state_group is None: + return + + self._current_state_ids = yield store.get_state_ids_for_group( + self.state_group, ) - if prev_state_id and event_state_key: - context.prev_state_ids = dict(context.current_state_ids) - context.prev_state_ids[(event_type, event_state_key)] = prev_state_id + if self._prev_state_id and self._event_state_key is not None: + self._prev_state_ids = dict(self._current_state_ids) + + key = (self._event_type, self._event_state_key) + self._prev_state_ids[key] = self._prev_state_id else: - context.prev_state_ids = context.current_state_ids + self._prev_state_ids = self._current_state_ids - app_service_id = input["app_service_id"] - if app_service_id: - context.app_service = store.get_app_service_by_id(app_service_id) + @defer.inlineCallbacks + def update_state(self, state_group, prev_state_ids, current_state_ids, + prev_group, delta_ids): + """Replace the state in the context + """ + + # We need to make sure we wait for any ongoing fetching of state + # to complete so that the updated state doesn't get clobbered + if self._fetching_state_deferred: + yield make_deferred_yieldable(self._fetching_state_deferred) + + self.state_group = state_group + self._prev_state_ids = prev_state_ids + self.prev_group = prev_group + self._current_state_ids = current_state_ids + self.delta_ids = delta_ids - defer.returnValue(context) + # We need to ensure that that we've marked as having fetched the state + self._fetching_state_deferred = defer.succeed(None) def _encode_state_dict(state_dict): @@ -159,7 +277,7 @@ def _encode_state_dict(state_dict): return [ (etype, state_key, v) - for (etype, state_key), v in state_dict.iteritems() + for (etype, state_key), v in iteritems(state_dict) ] diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 48f26db67c..657935d1ac 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -24,6 +24,7 @@ from prometheus_client import Counter from twisted.internet import defer from twisted.internet.abstract import isIPAddress +from twisted.python import failure from synapse.api.constants import EventTypes from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError @@ -186,8 +187,12 @@ class FederationServer(FederationBase): logger.warn("Error handling PDU %s: %s", event_id, e) pdu_results[event_id] = {"error": str(e)} except Exception as e: + f = failure.Failure() pdu_results[event_id] = {"error": str(e)} - logger.exception("Failed to handle PDU %s", event_id) + logger.error( + "Failed to handle PDU %s: %s", + event_id, f.getTraceback().rstrip(), + ) yield async.concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), @@ -202,10 +207,6 @@ class FederationServer(FederationBase): edu.content ) - pdu_failures = getattr(transaction, "pdu_failures", []) - for failure in pdu_failures: - logger.info("Got failure %r", failure) - response = { "pdus": pdu_results, } diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 5157c3860d..0bb468385d 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object): self.edus = SortedDict() # stream position -> Edu - self.failures = SortedDict() # stream position -> (destination, Failure) - self.device_messages = SortedDict() # stream position -> destination self.pos = 1 @@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object): for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", - "edus", "failures", "device_messages", "pos_time", + "edus", "device_messages", "pos_time", ]: register(queue_name, getattr(self, queue_name)) @@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object): for key in keys[:i]: del self.edus[key] - # Delete things out of failure map - keys = self.failures.keys() - i = self.failures.bisect_left(position_to_delete) - for key in keys[:i]: - del self.failures[key] - # Delete things out of device map keys = self.device_messages.keys() i = self.device_messages.bisect_left(position_to_delete) @@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_failure(self, failure, destination): - """As per TransactionQueue""" - pos = self._next_pos() - - self.failures[pos] = (destination, str(failure)) - self.notifier.on_new_replication_data() - def send_device_messages(self, destination): """As per TransactionQueue""" pos = self._next_pos() @@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object): for (pos, edu) in edus: rows.append((pos, EduRow(edu))) - # Fetch changed failures - i = self.failures.bisect_right(from_token) - j = self.failures.bisect_right(to_token) + 1 - failures = self.failures.items()[i:j] - - for (pos, (destination, failure)) in failures: - rows.append((pos, FailureRow( - destination=destination, - failure=failure, - ))) - # Fetch changed device messages i = self.device_messages.bisect_right(from_token) j = self.device_messages.bisect_right(to_token) + 1 @@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ( buff.edus.setdefault(self.edu.destination, []).append(self.edu) -class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( - "destination", # str - "failure", -))): - """Streams failures to a remote server. Failures are issued when there was - something wrong with a transaction the remote sent us, e.g. it included - an event that was invalid. - """ - - TypeId = "f" - - @staticmethod - def from_data(data): - return FailureRow( - destination=data["destination"], - failure=data["failure"], - ) - - def to_data(self): - return { - "destination": self.destination, - "failure": self.failure, - } - - def add_to_buffer(self, buff): - buff.failures.setdefault(self.destination, []).append(self.failure) - - class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( "destination", # str ))): @@ -471,7 +417,6 @@ TypeToRow = { PresenceRow, KeyedEduRow, EduRow, - FailureRow, DeviceRow, ) } @@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] - "failures", # dict of destination -> [failures] "device_destinations", # set of destinations )) @@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows): presence=[], keyed_edus={}, edus={}, - failures={}, device_destinations=set(), ) @@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows): edu.destination, edu.edu_type, edu.content, key=None, ) - for destination, failure_list in iteritems(buff.failures): - for failure in failure_list: - transaction_queue.send_failure(destination, failure) - for destination in buff.device_destinations: transaction_queue.send_device_messages(destination) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5a956ecfb3..78f9d40a3a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -30,7 +30,8 @@ from synapse.metrics import ( sent_edus_counter, sent_transactions_counter, ) -from synapse.util import PreserveLoggingContext, logcontext +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import logcontext from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter @@ -115,9 +116,6 @@ class TransactionQueue(object): ), ) - # destination -> list of tuple(failure, deferred) - self.pending_failures_by_dest = {} - # destination -> stream_id of last successfully sent to-device message. # NB: may be a long or an int. self.last_device_stream_id_by_dest = {} @@ -165,10 +163,11 @@ class TransactionQueue(object): if self._is_processing: return - # fire off a processing loop in the background. It's likely it will - # outlast the current request, so run it in the sentinel logcontext. - with PreserveLoggingContext(): - self._process_event_queue_loop() + # fire off a processing loop in the background + run_as_background_process( + "process_event_queue_for_federation", + self._process_event_queue_loop, + ) @defer.inlineCallbacks def _process_event_queue_loop(self): @@ -380,19 +379,6 @@ class TransactionQueue(object): self._attempt_new_transaction(destination) - def send_failure(self, failure, destination): - if destination == self.server_name or destination == "localhost": - return - - if not self.can_send_to(destination): - return - - self.pending_failures_by_dest.setdefault( - destination, [] - ).append(failure) - - self._attempt_new_transaction(destination) - def send_device_messages(self, destination): if destination == self.server_name or destination == "localhost": return @@ -432,14 +418,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Starting transaction loop", destination) - # Drop the logcontext before starting the transaction. It doesn't - # really make sense to log all the outbound transactions against - # whatever path led us to this point: that's pretty arbitrary really. - # - # (this also means we can fire off _perform_transaction without - # yielding) - with logcontext.PreserveLoggingContext(): - self._transaction_transmission_loop(destination) + run_as_background_process( + "federation_transaction_transmission_loop", + self._transaction_transmission_loop, + destination, + ) @defer.inlineCallbacks def _transaction_transmission_loop(self, destination): @@ -470,7 +453,6 @@ class TransactionQueue(object): pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) pending_edus.extend( self.pending_edus_keyed_by_dest.pop(destination, {}).values() @@ -498,7 +480,7 @@ class TransactionQueue(object): logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) - if not pending_pdus and not pending_edus and not pending_failures: + if not pending_pdus and not pending_edus: logger.debug("TX [%s] Nothing to send", destination) self.last_device_stream_id_by_dest[destination] = ( device_stream_id @@ -508,7 +490,7 @@ class TransactionQueue(object): # END CRITICAL SECTION success = yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures, + destination, pending_pdus, pending_edus, ) if success: sent_transactions_counter.inc() @@ -585,14 +567,12 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks - def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures): + def _send_new_transaction(self, destination, pending_pdus, pending_edus): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) pdus = [x[0] for x in pending_pdus] edus = pending_edus - failures = [x.get_dict() for x in pending_failures] success = True @@ -602,11 +582,10 @@ class TransactionQueue(object): logger.debug( "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", + " (pdus: %d, edus: %d)", destination, txn_id, len(pdus), len(edus), - len(failures) ) logger.debug("TX [%s] Persisting transaction...", destination) @@ -618,7 +597,6 @@ class TransactionQueue(object): destination=destination, pdus=pdus, edus=edus, - pdu_failures=failures, ) self._next_txn_id += 1 @@ -628,12 +606,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Persisted transaction", destination) logger.info( "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d, failures: %d)", + " (PDUs: %d, EDUs: %d)", destination, txn_id, transaction.transaction_id, len(pdus), len(edus), - len(failures), ) # Actually send the transaction diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c9beca27c2..3b5ea9515a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -283,11 +283,10 @@ class FederationSendServlet(BaseFederationServlet): ) logger.info( - "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)", + "Received txn %s from %s. (PDUs: %d, EDUs: %d)", transaction_id, origin, len(transaction_data.get("pdus", [])), len(transaction_data.get("edus", [])), - len(transaction_data.get("failures", [])), ) # We should ideally be getting this from the security layer. @@ -404,10 +403,10 @@ class FederationMakeLeaveServlet(BaseFederationServlet): class FederationSendLeaveServlet(BaseFederationServlet): - PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)" + PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)" @defer.inlineCallbacks - def on_PUT(self, origin, content, query, room_id, txid): + def on_PUT(self, origin, content, query, room_id, event_id): content = yield self.handler.on_send_leave_request(origin, content) defer.returnValue((200, content)) diff --git a/synapse/federation/units.py b/synapse/federation/units.py index bb1b3b13f7..c5ab14314e 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject): "previous_ids", "pdus", "edus", - "pdu_failures", ] internal_keys = [ diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 47452700a8..b04f4234ca 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -43,6 +43,7 @@ from signedjson.sign import sign_json from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id from synapse.util.logcontext import run_in_background @@ -129,7 +130,7 @@ class GroupAttestionRenewer(object): self.attestations = hs.get_groups_attestation_signing() self._renew_attestations_loop = self.clock.looping_call( - self._renew_attestations, 30 * 60 * 1000, + self._start_renew_attestations, 30 * 60 * 1000, ) @defer.inlineCallbacks @@ -151,6 +152,9 @@ class GroupAttestionRenewer(object): defer.returnValue({}) + def _start_renew_attestations(self): + return run_as_background_process("renew_attestations", self._renew_attestations) + @defer.inlineCallbacks def _renew_attestations(self): """Called periodically to check if we need to update any of our attestations diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 4b9923d8c0..413425fed1 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,9 +17,7 @@ from .admin import AdminHandler from .directory import DirectoryHandler from .federation import FederationHandler from .identity import IdentityHandler -from .message import MessageHandler from .register import RegistrationHandler -from .room import RoomContextHandler from .search import SearchHandler @@ -44,10 +42,8 @@ class Handlers(object): def __init__(self, hs): self.registration_handler = RegistrationHandler(hs) - self.message_handler = MessageHandler(hs) self.federation_handler = FederationHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) - self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index b6a8b3aa3b..704181d2d3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -112,8 +112,9 @@ class BaseHandler(object): guest_access = event.content.get("guest_access", "forbidden") if guest_access != "can_join": if context: + current_state_ids = yield context.get_current_state_ids(self.store) current_state = yield self.store.get_events( - list(context.current_state_ids.values()) + list(current_state_ids.values()) ) else: current_state = yield self.state_handler.get_current_state( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ec9fe01a5a..ee41aed69e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,6 +23,7 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -106,7 +107,9 @@ class ApplicationServicesHandler(object): yield self._check_user_exists(event.state_key) if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) + def start_scheduler(): + return self.scheduler.start().addErrback(log_failure) + run_as_background_process("as_scheduler", start_scheduler) self.started_scheduler = True # Fork off pushes to these services diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d3ecebd29f..91d8def08b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,8 +21,8 @@ import logging import sys import six -from six import iteritems -from six.moves import http_client +from six import iteritems, itervalues +from six.moves import http_client, zip from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json @@ -43,7 +43,6 @@ from synapse.crypto.event_signing import ( add_hashes_and_signatures, compute_event_signature, ) -from synapse.events.utils import prune_event from synapse.events.validator import EventValidator from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id @@ -52,8 +51,8 @@ from synapse.util.async import Linearizer from synapse.util.distributor import user_joined_room from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function -from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination +from synapse.visibility import filter_events_for_server from ._base import BaseHandler @@ -77,7 +76,7 @@ class FederationHandler(BaseHandler): self.hs = hs self.store = hs.get_datastore() - self.replication_layer = hs.get_federation_client() + self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() @@ -256,7 +255,7 @@ class FederationHandler(BaseHandler): # know about for p in prevs - seen: state, got_auth_chain = ( - yield self.replication_layer.get_state_for_room( + yield self.federation_client.get_state_for_room( origin, pdu.room_id, p ) ) @@ -339,7 +338,7 @@ class FederationHandler(BaseHandler): # # see https://github.com/matrix-org/synapse/pull/1744 - missing_events = yield self.replication_layer.get_missing_events( + missing_events = yield self.federation_client.get_missing_events( origin, pdu.room_id, earliest_events_ids=list(latest), @@ -487,7 +486,10 @@ class FederationHandler(BaseHandler): # joined the room. Don't bother if the user is just # changing their profile info. newly_joined = True - prev_state_id = context.prev_state_ids.get( + + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_state_id = prev_state_ids.get( (event.type, event.state_key) ) if prev_state_id: @@ -501,137 +503,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - @measure_func("_filter_events_for_server") - @defer.inlineCallbacks - def _filter_events_for_server(self, server_name, room_id, events): - """Filter the given events for the given server, redacting those the - server can't see. - - Assumes the server is currently in the room. - - Returns - list[FrozenEvent] - """ - # First lets check to see if all the events have a history visibility - # of "shared" or "world_readable". If thats the case then we don't - # need to check membership (as we know the server is in the room). - event_to_state_ids = yield self.store.get_state_ids_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - ) - ) - - visibility_ids = set() - for sids in event_to_state_ids.itervalues(): - hist = sids.get((EventTypes.RoomHistoryVisibility, "")) - if hist: - visibility_ids.add(hist) - - # If we failed to find any history visibility events then the default - # is "shared" visiblity. - if not visibility_ids: - defer.returnValue(events) - - event_map = yield self.store.get_events(visibility_ids) - all_open = all( - e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in event_map.itervalues() - ) - - if all_open: - defer.returnValue(events) - - # Ok, so we're dealing with events that have non-trivial visibility - # rules, so we need to also get the memberships of the room. - - event_to_state_ids = yield self.store.get_state_ids_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None), - ) - ) - - # We only want to pull out member events that correspond to the - # server's domain. - - def check_match(id): - try: - return server_name == get_domain_from_id(id) - except Exception: - return False - - # Parses mapping `event_id -> (type, state_key) -> state event_id` - # to get all state ids that we're interested in. - event_map = yield self.store.get_events([ - e_id - for key_to_eid in list(event_to_state_ids.values()) - for key, e_id in key_to_eid.items() - if key[0] != EventTypes.Member or check_match(key[1]) - ]) - - event_to_state = { - e_id: { - key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() - if inner_e_id in event_map - } - for e_id, key_to_eid in event_to_state_ids.iteritems() - } - - erased_senders = yield self.store.are_users_erased( - e.sender for e in events, - ) - - def redact_disallowed(event, state): - # if the sender has been gdpr17ed, always return a redacted - # copy of the event. - if erased_senders[event.sender]: - logger.info( - "Sender of %s has been erased, redacting", - event.event_id, - ) - return prune_event(event) - - if not state: - return event - - history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history: - visibility = history.content.get("history_visibility", "shared") - if visibility in ["invited", "joined"]: - # We now loop through all state events looking for - # membership states for the requesting server to determine - # if the server is either in the room or has been invited - # into the room. - for ev in state.itervalues(): - if ev.type != EventTypes.Member: - continue - try: - domain = get_domain_from_id(ev.state_key) - except Exception: - continue - - if domain != server_name: - continue - - memtype = ev.membership - if memtype == Membership.JOIN: - return event - elif memtype == Membership.INVITE: - if visibility == "invited": - return event - else: - return prune_event(event) - - return event - - defer.returnValue([ - redact_disallowed(e, event_to_state[e.event_id]) - for e in events - ]) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities): @@ -651,7 +522,7 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") - events = yield self.replication_layer.backfill( + events = yield self.federation_client.backfill( dest, room_id, limit=limit, @@ -699,7 +570,7 @@ class FederationHandler(BaseHandler): state_events = {} events_to_state = {} for e_id in edges: - state, auth = yield self.replication_layer.get_state_for_room( + state, auth = yield self.federation_client.get_state_for_room( destination=dest, room_id=room_id, event_id=e_id @@ -741,7 +612,7 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ logcontext.run_in_background( - self.replication_layer.get_pdu, + self.federation_client.get_pdu, [dest], event_id, outlier=True, @@ -863,7 +734,7 @@ class FederationHandler(BaseHandler): """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.iteritems() + for (e_type, state_key), event in iteritems(state) if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -880,7 +751,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.iteritems(), key=lambda d: d[1]) + return sorted(joined_domains.items(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -943,7 +814,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.iterkeys()) + event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -959,15 +830,15 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.itervalues() for e_id in ids.itervalues()], + [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.iteritems() + for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in states.iteritems() + } for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: @@ -1022,7 +893,7 @@ class FederationHandler(BaseHandler): Invites must be signed by the invitee's server before distribution. """ - pdu = yield self.replication_layer.send_invite( + pdu = yield self.federation_client.send_invite( destination=target_host, room_id=event.room_id, event_id=event.event_id, @@ -1038,16 +909,6 @@ class FederationHandler(BaseHandler): [auth_id for auth_id, _ in event.auth_events], include_given=True ) - - for event in auth: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue([e for e in auth]) @log_function @@ -1094,7 +955,7 @@ class FederationHandler(BaseHandler): target_hosts.insert(0, origin) except ValueError: pass - ret = yield self.replication_layer.send_join(target_hosts, event) + ret = yield self.federation_client.send_join(target_hosts, event) origin = ret["origin"] state = ret["state"] @@ -1248,10 +1109,12 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - state_ids = list(context.prev_state_ids.values()) + prev_state_ids = yield context.get_prev_state_ids(self.store) + + state_ids = list(prev_state_ids.values()) auth_chain = yield self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(list(context.prev_state_ids.values())) + state = yield self.store.get_events(list(prev_state_ids.values())) defer.returnValue({ "state": list(state.values()), @@ -1348,7 +1211,7 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.replication_layer.send_leave( + yield self.federation_client.send_leave( target_hosts, event ) @@ -1371,7 +1234,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content={},): - origin, pdu = yield self.replication_layer.make_membership_event( + origin, pdu = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, @@ -1416,7 +1279,7 @@ class FederationHandler(BaseHandler): @log_function def on_make_leave_request(self, room_id, user_id): """ We've received a /make_leave/ request, so we create a partial - join event for the room and return that. We do *not* persist or + leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ builder = self.event_builder_factory.new({ @@ -1503,18 +1366,6 @@ class FederationHandler(BaseHandler): del results[(event.type, event.state_key)] res = list(results.values()) - for event in res: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue(res) else: defer.returnValue([]) @@ -1558,7 +1409,7 @@ class FederationHandler(BaseHandler): limit ) - events = yield self._filter_events_for_server(origin, room_id, events) + events = yield filter_events_for_server(self.store, origin, events) defer.returnValue(events) @@ -1586,18 +1437,6 @@ class FederationHandler(BaseHandler): ) if event: - if self.is_mine_id(event.event_id): - # FIXME: This is a temporary work around where we occasionally - # return events slightly differently than when they were - # originally signed - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - in_room = yield self.auth.check_host_in_room( event.room_id, origin @@ -1605,8 +1444,8 @@ class FederationHandler(BaseHandler): if not in_room: raise AuthError(403, "Host not in room.") - events = yield self._filter_events_for_server( - origin, event.room_id, [event] + events = yield filter_events_for_server( + self.store, origin, [event], ) event = events[0] defer.returnValue(event) @@ -1681,7 +1520,7 @@ class FederationHandler(BaseHandler): yield self.store.persist_events( [ (ev_info["event"], context) - for ev_info, context in itertools.izip(event_infos, contexts) + for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, ) @@ -1728,7 +1567,7 @@ class FederationHandler(BaseHandler): missing_auth_events.add(e_id) for e_id in missing_auth_events: - m_ev = yield self.replication_layer.get_pdu( + m_ev = yield self.federation_client.get_pdu( [origin], e_id, outlier=True, @@ -1801,8 +1640,9 @@ class FederationHandler(BaseHandler): ) if not auth_events: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -1862,15 +1702,6 @@ class FederationHandler(BaseHandler): local_auth_chain, remote_auth_chain ) - for event in ret["auth_chain"]: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) @@ -1896,8 +1727,8 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) - missing_events = yield self._filter_events_for_server( - origin, room_id, missing_events, + missing_events = yield filter_events_for_server( + self.store, origin, missing_events, ) defer.returnValue(missing_events) @@ -1946,7 +1777,7 @@ class FederationHandler(BaseHandler): logger.info("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. try: - remote_auth_chain = yield self.replication_layer.get_event_auth( + remote_auth_chain = yield self.federation_client.get_event_auth( origin, event.room_id, event.event_id ) @@ -2051,9 +1882,10 @@ class FederationHandler(BaseHandler): break if do_resolution: + prev_state_ids = yield context.get_prev_state_ids(self.store) # 1. Get what we think is the auth chain. auth_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids + event, prev_state_ids ) local_auth_chain = yield self.store.get_auth_chain( auth_ids, include_given=True @@ -2061,7 +1893,7 @@ class FederationHandler(BaseHandler): try: # 2. Get remote difference. - result = yield self.replication_layer.query_auth( + result = yield self.federation_client.query_auth( origin, event.room_id, event.event_id, @@ -2143,21 +1975,34 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update(state_updates) - if context.delta_ids is not None: - context.delta_ids = dict(context.delta_ids) - context.delta_ids.update(state_updates) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ + current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = dict(current_state_ids) + + current_state_ids.update(state_updates) + + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({ k: a.event_id for k, a in iteritems(auth_events) }) - context.state_group = yield self.store.store_state_group( + + # create a new state group as a delta from the existing one. + prev_group = context.state_group + state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + prev_group=prev_group, + delta_ids=state_updates, + current_state_ids=current_state_ids, + ) + + yield context.update_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=state_updates, ) @defer.inlineCallbacks @@ -2347,7 +2192,7 @@ class FederationHandler(BaseHandler): yield member_handler.send_membership_event(None, event, context) else: destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) - yield self.replication_layer.forward_third_party_invite( + yield self.federation_client.forward_third_party_invite( destinations, room_id, event_dict, @@ -2397,7 +2242,8 @@ class FederationHandler(BaseHandler): event.content["third_party_invite"]["signed"]["token"] ) original_invite = None - original_invite_id = context.prev_state_ids.get(key) + prev_state_ids = yield context.get_prev_state_ids(self.store) + original_invite_id = prev_state_ids.get(key) if original_invite_id: original_invite = yield self.store.get_event( original_invite_id, allow_none=True @@ -2439,7 +2285,8 @@ class FederationHandler(BaseHandler): signed = event.content["third_party_invite"]["signed"] token = signed["token"] - invite_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + invite_event_id = prev_state_ids.get( (EventTypes.ThirdPartyInvite, token,) ) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index fb11716eb8..40e7580a61 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler): try: if event.membership == Membership.JOIN: room_end_token = now_token.room_key - deferred_room_state = self.state_handler.get_current_state( - event.room_id + deferred_room_state = run_in_background( + self.state_handler.get_current_state, + event.room_id, ) elif event.membership == Membership.LEAVE: room_end_token = "s%d" % (event.stream_ordering,) - deferred_room_state = self.store.get_state_for_events( - [event.event_id], None + deferred_room_state = run_in_background( + self.store.get_state_for_events, + [event.event_id], None, ) deferred_room_state.addCallback( lambda states: states[event.event_id] @@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler): receipts = [] defer.returnValue(receipts) - presence, receipts, (messages, token) = yield defer.gatherResults( - [ - run_in_background(get_presence), - run_in_background(get_receipts), - run_in_background( - self.store.get_recent_events_for_room, - room_id, - limit=limit, - end_token=now_token.room_key, - ) - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + presence, receipts, (messages, token) = yield make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, + room_id, + limit=limit, + end_token=now_token.room_key, + ) + ], + consumeErrors=True, + ).addErrback(unwrapFirstError), + ) messages = yield filter_events_for_client( self.store, user_id, messages, is_peeking=is_peeking, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852ceb..39d7724778 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed -from twisted.python.failure import Failure from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError @@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master -from synapse.types import RoomAlias, RoomStreamToken, UserID -from synapse.util.async import Limiter, ReadWriteLock +from synapse.types import RoomAlias, UserID +from synapse.util.async import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func -from synapse.util.stringutils import random_string -from synapse.visibility import filter_events_for_client from ._base import BaseHandler logger = logging.getLogger(__name__) -class PurgeStatus(object): - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - - Attributes: - status (int): Tracks whether this request has completed. One of - STATUS_{ACTIVE,COMPLETE,FAILED} +class MessageHandler(object): + """Contains some read only APIs to get state about a room """ - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - def __init__(self): - self.status = PurgeStatus.STATUS_ACTIVE - - def asdict(self): - return { - "status": PurgeStatus.STATUS_TEXT[self.status] - } - - -class MessageHandler(BaseHandler): - def __init__(self, hs): - super(MessageHandler, self).__init__(hs) - self.hs = hs - self.state = hs.get_state_handler() + self.auth = hs.get_auth() self.clock = hs.get_clock() - - self.pagination_lock = ReadWriteLock() - self._purges_in_progress_by_room = set() - # map from purge id to PurgeStatus - self._purges_by_id = {} - - def start_purge_history(self, room_id, token, - delete_local_events=False): - """Start off a history purge on a room. - - Args: - room_id (str): The room to purge from - - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - str: unique ID for this purge transaction. - """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, - "History purge already in progress for %s" % (room_id, ), - ) - - purge_id = random_string(16) - - # we log the purge_id here so that it can be tied back to the - # request id in the log lines. - logger.info("[purge] starting purge_id %s", purge_id) - - self._purges_by_id[purge_id] = PurgeStatus() - run_in_background( - self._purge_history, - purge_id, room_id, token, delete_local_events, - ) - return purge_id - - @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, - delete_local_events): - """Carry out a history purge on a room. - - Args: - purge_id (str): The id for this purge - room_id (str): The room to purge from - token (str): topological token to delete events before - delete_local_events (bool): True to delete local events as well as - remote ones - - Returns: - Deferred - """ - self._purges_in_progress_by_room.add(room_id) - try: - with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history( - room_id, token, delete_local_events, - ) - logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE - except Exception: - logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the purge from the list 24 hours after it completes - def clear_purge(): - del self._purges_by_id[purge_id] - self.hs.get_reactor().callLater(24 * 3600, clear_purge) - - def get_purge_status(self, purge_id): - """Get the current status of an active purge - - Args: - purge_id (str): purge_id returned by start_purge_history - - Returns: - PurgeStatus|None - """ - return self._purges_by_id.get(purge_id) - - @defer.inlineCallbacks - def get_messages(self, requester, room_id=None, pagin_config=None, - as_client_event=True, event_filter=None): - """Get messages in a room. - - Args: - requester (Requester): The user requesting messages. - room_id (str): The room they want messages from. - pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. - as_client_event (bool): True to get events in client-server format. - event_filter (Filter): Filter to apply to results or None - Returns: - dict: Pagination API results - """ - user_id = requester.user.to_string() - - if pagin_config.from_token: - room_token = pagin_config.from_token.room_key - else: - pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token_for_room( - room_id=room_id - ) - ) - room_token = pagin_config.from_token.room_key - - room_token = RoomStreamToken.parse(room_token) - - pagin_config.from_token = pagin_config.from_token.copy_and_replace( - "room_key", str(room_token) - ) - - source_config = pagin_config.get_source_config("room") - - with (yield self.pagination_lock.read(room_id)): - membership, member_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token( - room_id, room_token.stream - ) - - if membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - leave_token = yield self.store.get_topological_token_for_event( - member_event_id - ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo - ) - - events, next_key = yield self.store.paginate_room_events( - room_id=room_id, - from_key=source_config.from_key, - to_key=source_config.to_key, - direction=source_config.direction, - limit=source_config.limit, - event_filter=event_filter, - ) - - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) - - if not events: - defer.returnValue({ - "chunk": [], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - }) - - if event_filter: - events = event_filter.filter(events) - - events = yield filter_events_for_client( - self.store, - user_id, - events, - is_peeking=(member_event_id is None), - ) - - time_now = self.clock.time_msec() - - chunk = { - "chunk": [ - serialize_event(e, time_now, as_client_event) - for e in events - ], - "start": pagin_config.from_token.to_string(), - "end": next_token.to_string(), - } - - defer.returnValue(chunk) + self.state = hs.get_state_handler() + self.store = hs.get_datastore() @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, @@ -286,12 +64,12 @@ class MessageHandler(BaseHandler): Raises: SynapseError if something went wrong. """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership == Membership.JOIN: - data = yield self.state_handler.get_current_state( + data = yield self.state.get_current_state( room_id, event_type, state_key ) elif membership == Membership.LEAVE: @@ -304,31 +82,6 @@ class MessageHandler(BaseHandler): defer.returnValue(data) @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - - @defer.inlineCallbacks def get_state_events(self, user_id, room_id, is_guest=False): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has @@ -340,12 +93,12 @@ class MessageHandler(BaseHandler): Returns: A list of dicts representing state events. [{}, {}, {}] """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( + membership, membership_event_id = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) + room_state = yield self.state.get_current_state(room_id) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( [membership_event_id], None @@ -373,7 +126,7 @@ class MessageHandler(BaseHandler): if not requester.app_service: # We check AS auth after fetching the room membership, as it # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( + membership, _ = yield self.auth.check_in_room_or_world_readable( room_id, user_id ) if membership != Membership.JOIN: @@ -427,7 +180,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) + self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() @@ -630,7 +383,8 @@ class EventCreationHandler(object): If so, returns the version of the event in context. Otherwise, returns None. """ - prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_event_id = prev_state_ids.get((event.type, event.state_key)) prev_event = yield self.store.get_event(prev_event_id, allow_none=True) if not prev_event: return @@ -752,8 +506,8 @@ class EventCreationHandler(object): event = builder.build() logger.debug( - "Created event %s with state: %s", - event.event_id, context.prev_state_ids, + "Created event %s", + event.event_id, ) defer.returnValue( @@ -806,8 +560,9 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( - self.hs.get_clock(), - self.http_client, + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, requester=requester, @@ -884,9 +639,11 @@ class EventCreationHandler(object): e.sender == event.sender ) + current_state_ids = yield context.get_current_state_ids(self.store) + state_to_include_ids = [ e_id - for k, e_id in iteritems(context.current_state_ids) + for k, e_id in iteritems(current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -922,8 +679,9 @@ class EventCreationHandler(object): ) if event.type == EventTypes.Redaction: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -943,11 +701,13 @@ class EventCreationHandler(object): "You don't have permission to redact events" ) - if event.type == EventTypes.Create and context.prev_state_ids: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) + if event.type == EventTypes.Create: + prev_state_ids = yield context.get_prev_state_ids(self.store) + if prev_state_ids: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py new file mode 100644 index 0000000000..b2849783ed --- /dev/null +++ b/synapse/handlers/pagination.py @@ -0,0 +1,265 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2017 - 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from twisted.internet import defer +from twisted.python.failure import Failure + +from synapse.api.constants import Membership +from synapse.api.errors import SynapseError +from synapse.events.utils import serialize_event +from synapse.types import RoomStreamToken +from synapse.util.async import ReadWriteLock +from synapse.util.logcontext import run_in_background +from synapse.util.stringutils import random_string +from synapse.visibility import filter_events_for_client + +logger = logging.getLogger(__name__) + + +class PurgeStatus(object): + """Object tracking the status of a purge request + + This class contains information on the progress of a purge request, for + return by get_purge_status. + + Attributes: + status (int): Tracks whether this request has completed. One of + STATUS_{ACTIVE,COMPLETE,FAILED} + """ + + STATUS_ACTIVE = 0 + STATUS_COMPLETE = 1 + STATUS_FAILED = 2 + + STATUS_TEXT = { + STATUS_ACTIVE: "active", + STATUS_COMPLETE: "complete", + STATUS_FAILED: "failed", + } + + def __init__(self): + self.status = PurgeStatus.STATUS_ACTIVE + + def asdict(self): + return { + "status": PurgeStatus.STATUS_TEXT[self.status] + } + + +class PaginationHandler(object): + """Handles pagination and purge history requests. + + These are in the same handler due to the fact we need to block clients + paginating during a purge. + """ + + def __init__(self, hs): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + + self.pagination_lock = ReadWriteLock() + self._purges_in_progress_by_room = set() + # map from purge id to PurgeStatus + self._purges_by_id = {} + + def start_purge_history(self, room_id, token, + delete_local_events=False): + """Start off a history purge on a room. + + Args: + room_id (str): The room to purge from + + token (str): topological token to delete events before + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + str: unique ID for this purge transaction. + """ + if room_id in self._purges_in_progress_by_room: + raise SynapseError( + 400, + "History purge already in progress for %s" % (room_id, ), + ) + + purge_id = random_string(16) + + # we log the purge_id here so that it can be tied back to the + # request id in the log lines. + logger.info("[purge] starting purge_id %s", purge_id) + + self._purges_by_id[purge_id] = PurgeStatus() + run_in_background( + self._purge_history, + purge_id, room_id, token, delete_local_events, + ) + return purge_id + + @defer.inlineCallbacks + def _purge_history(self, purge_id, room_id, token, + delete_local_events): + """Carry out a history purge on a room. + + Args: + purge_id (str): The id for this purge + room_id (str): The room to purge from + token (str): topological token to delete events before + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + Deferred + """ + self._purges_in_progress_by_room.add(room_id) + try: + with (yield self.pagination_lock.write(room_id)): + yield self.store.purge_history( + room_id, token, delete_local_events, + ) + logger.info("[purge] complete") + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + except Exception: + logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED + finally: + self._purges_in_progress_by_room.discard(room_id) + + # remove the purge from the list 24 hours after it completes + def clear_purge(): + del self._purges_by_id[purge_id] + self.hs.get_reactor().callLater(24 * 3600, clear_purge) + + def get_purge_status(self, purge_id): + """Get the current status of an active purge + + Args: + purge_id (str): purge_id returned by start_purge_history + + Returns: + PurgeStatus|None + """ + return self._purges_by_id.get(purge_id) + + @defer.inlineCallbacks + def get_messages(self, requester, room_id=None, pagin_config=None, + as_client_event=True, event_filter=None): + """Get messages in a room. + + Args: + requester (Requester): The user requesting messages. + room_id (str): The room they want messages from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + as_client_event (bool): True to get events in client-server format. + event_filter (Filter): Filter to apply to results or None + Returns: + dict: Pagination API results + """ + user_id = requester.user.to_string() + + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: + pagin_config.from_token = ( + yield self.hs.get_event_sources().get_current_token_for_room( + room_id=room_id + ) + ) + room_token = pagin_config.from_token.room_key + + room_token = RoomStreamToken.parse(room_token) + + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + with (yield self.pagination_lock.read(room_id)): + membership, member_event_id = yield self.auth.check_in_room_or_world_readable( + room_id, user_id + ) + + if source_config.direction == 'b': + # if we're going backwards, we might need to backfill. This + # requires that we have a topo token. + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token( + room_id, room_token.stream + ) + + if membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + leave_token = yield self.store.get_topological_token_for_event( + member_event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < max_topo: + source_config.from_key = str(leave_token) + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, max_topo + ) + + events, next_key = yield self.store.paginate_room_events( + room_id=room_id, + from_key=source_config.from_key, + to_key=source_config.to_key, + direction=source_config.direction, + limit=source_config.limit, + event_filter=event_filter, + ) + + next_token = pagin_config.from_token.copy_and_replace( + "room_key", next_key + ) + + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + if event_filter: + events = event_filter.filter(events) + + events = yield filter_events_for_client( + self.store, + user_id, + events, + is_peeking=(member_event_id is None), + ) + + time_now = self.clock.time_msec() + + chunk = { + "chunk": [ + serialize_event(e, time_now, as_client_event) + for e in events + ], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 859f6d2b2e..cb5c6d587e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer from synapse.api.errors import AuthError, CodeMessageException, SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -41,7 +42,7 @@ class ProfileHandler(BaseHandler): if hs.config.worker_app is None: self.clock.looping_call( - self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, ) @defer.inlineCallbacks @@ -254,6 +255,12 @@ class ProfileHandler(BaseHandler): room_id, str(e.message) ) + def _start_update_remote_profile_cache(self): + return run_as_background_process( + "Update remote profile", self._update_remote_profile_cache, + ) + + @defer.inlineCallbacks def _update_remote_profile_cache(self): """Called periodically to check profiles of remote users we haven't checked in a while. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f67512078b..7b7804d9b2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -15,6 +15,7 @@ # limitations under the License. """Contains functions for performing events on rooms.""" +import itertools import logging import math import string @@ -24,7 +25,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.errors import AuthError, Codes, StoreError, SynapseError -from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID +from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils from synapse.visibility import filter_events_for_client @@ -395,9 +396,13 @@ class RoomCreationHandler(BaseHandler): ) -class RoomContextHandler(BaseHandler): +class RoomContextHandler(object): + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit): + def get_event_context(self, user, room_id, event_id, limit, event_filter): """Retrieves events, pagination tokens and state around a given event in a room. @@ -407,6 +412,8 @@ class RoomContextHandler(BaseHandler): event_id (str) limit (int): The maximum number of events to return in total (excluding state). + event_filter (Filter|None): the filter to apply to the events returned + (excluding the target event_id) Returns: dict, or None if the event isn't found @@ -414,8 +421,6 @@ class RoomContextHandler(BaseHandler): before_limit = math.floor(limit / 2.) after_limit = limit - before_limit - now_token = yield self.hs.get_event_sources().get_current_token() - users = yield self.store.get_users_in_room(room_id) is_peeking = user.to_string() not in users @@ -441,7 +446,7 @@ class RoomContextHandler(BaseHandler): ) results = yield self.store.get_events_around( - room_id, event_id, before_limit, after_limit + room_id, event_id, before_limit, after_limit, event_filter ) results["events_before"] = yield filter_evts(results["events_before"]) @@ -453,16 +458,35 @@ class RoomContextHandler(BaseHandler): else: last_event_id = event_id + types = None + filtered_types = None + if event_filter and event_filter.lazy_load_members(): + members = set(ev.sender for ev in itertools.chain( + results["events_before"], + (results["event"],), + results["events_after"], + )) + filtered_types = [EventTypes.Member] + types = [(EventTypes.Member, member) for member in members] + + # XXX: why do we return the state as of the last event rather than the + # first? Shouldn't we be consistent with /sync? + # https://github.com/matrix-org/matrix-doc/issues/687 + state = yield self.store.get_state_for_events( - [last_event_id], None + [last_event_id], types, filtered_types=filtered_types, ) results["state"] = list(state[last_event_id].values()) - results["start"] = now_token.copy_and_replace( + # We use a dummy token here as we only care about the room portion of + # the token, which we replace. + token = StreamToken.START + + results["start"] = token.copy_and_replace( "room_key", results["start"] ).to_string() - results["end"] = now_token.copy_and_replace( + results["end"] = token.copy_and_replace( "room_key", results["end"] ).to_string() diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 00f2e279bc..0d4a3f4677 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -201,7 +201,9 @@ class RoomMemberHandler(object): ratelimit=ratelimit, ) - prev_member_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_member_event_id = prev_state_ids.get( (EventTypes.Member, target.to_string()), None ) @@ -496,9 +498,10 @@ class RoomMemberHandler(object): if prev_event is not None: return + prev_state_ids = yield context.get_prev_state_ids(self.store) if event.membership == Membership.JOIN: if requester.is_guest: - guest_can_join = yield self._can_guest_join(context.prev_state_ids) + guest_can_join = yield self._can_guest_join(prev_state_ids) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. @@ -517,7 +520,7 @@ class RoomMemberHandler(object): ratelimit=ratelimit, ) - prev_member_event_id = context.prev_state_ids.get( + prev_member_event_id = prev_state_ids.get( (EventTypes.Member, event.state_key), None ) @@ -705,6 +708,10 @@ class RoomMemberHandler(object): inviter_display_name = member_event.content.get("displayname", "") inviter_avatar_url = member_event.content.get("avatar_url", "") + # if user has no display name, default to their MXID + if not inviter_display_name: + inviter_display_name = user.to_string() + canonical_room_alias = "" canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, "")) if canonical_alias_event: diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 69ae9731d5..c464adbd0b 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -287,7 +287,7 @@ class SearchHandler(BaseHandler): contexts = {} for event in allowed_events: res = yield self.store.get_events_around( - event.room_id, event.event_id, before_limit, after_limit + event.room_id, event.event_id, before_limit, after_limit, ) logger.info( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c24e35362a..dff1f67dcb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2015 - 2016 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership from synapse.push.clientformat import format_push_rules_for_user from synapse.types import RoomStreamToken from synapse.util.async import concurrently_execute +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func @@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +# Store the cache that tracks which lazy-loaded members have been sent to a given +# client for no more than 30 minutes. +LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 + +# Remember the last 100 members we sent to a client for the purposes of +# avoiding redundantly sending the same lazy-loaded members to the client +LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 + SyncConfig = collections.namedtuple("SyncConfig", [ "user", @@ -181,6 +192,12 @@ class SyncHandler(object): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() + # ExpiringCache((User, Device)) -> LruCache(state_key => event_id) + self.lazy_loaded_members_cache = ExpiringCache( + "lazy_loaded_members_cache", self.clock, + max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, + ) + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -416,29 +433,44 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event): + def get_state_after_event(self, event, types=None, filtered_types=None): """ Get the room state after the given event Args: event(synapse.events.EventBase): event of interest + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A Deferred map from ((type, state_key)->Event) """ - state_ids = yield self.store.get_state_ids_for_event(event.event_id) + state_ids = yield self.store.get_state_ids_for_event( + event.event_id, types, filtered_types=filtered_types, + ) if event.is_state(): state_ids = state_ids.copy() state_ids[(event.type, event.state_key)] = event.event_id defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position): + def get_state_at(self, room_id, stream_position, types=None, filtered_types=None): """ Get the room state at a particular stream position Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A Deferred map from ((type, state_key)->Event) @@ -453,7 +485,9 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] - state = yield self.get_state_after_event(last_event) + state = yield self.get_state_after_event( + last_event, types, filtered_types=filtered_types, + ) else: # no events in this room - so presumably no state @@ -485,59 +519,129 @@ class SyncHandler(object): # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): + + types = None + filtered_types = None + + lazy_load_members = sync_config.filter_collection.lazy_load_members() + include_redundant_members = ( + sync_config.filter_collection.include_redundant_members() + ) + + if lazy_load_members: + # We only request state for the members needed to display the + # timeline: + + types = [ + (EventTypes.Member, state_key) + for state_key in set( + event.sender # FIXME: we also care about invite targets etc. + for event in batch.events + ) + ] + + # only apply the filtering to room members + filtered_types = [EventTypes.Member] + + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events if event.is_state() + } + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types, + filtered_types=filtered_types, ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types, + filtered_types=filtered_types, ) + else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token + room_id, stream_position=now_token, types=types, + filtered_types=filtered_types, ) state_ids = current_state_ids - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, previous={}, current=current_state_ids, + lazy_load_members=lazy_load_members, ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token + room_id, stream_position=since_token, types=types, + filtered_types=filtered_types, ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types, + filtered_types=filtered_types, ) state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types, + filtered_types=filtered_types, ) - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, previous=state_at_previous_sync, current=current_state_ids, + lazy_load_members=lazy_load_members, ) else: state_ids = {} + if lazy_load_members: + if types: + state_ids = yield self.store.get_state_ids_for_event( + batch.events[0].event_id, types=types, + filtered_types=filtered_types, + ) + + if lazy_load_members and not include_redundant_members: + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering state from %r...", state_ids) + state_ids = { + t: event_id + for t, event_id in state_ids.iteritems() + if cache.get(t[1]) != event_id + } + logger.debug("...to %r", state_ids) + + # add any member IDs we are about to send into our LruCache + for t, event_id in itertools.chain( + state_ids.items(), + timeline_state.items(), + ): + if t[0] == EventTypes.Member: + cache.set(t[1], event_id) state = {} if state_ids: @@ -1448,7 +1552,9 @@ def _action_has_highlight(actions): return False -def _calculate_state(timeline_contains, timeline_start, previous, current): +def _calculate_state( + timeline_contains, timeline_start, previous, current, lazy_load_members, +): """Works out what state to include in a sync response. Args: @@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): previous (dict): state at the end of the previous sync (or empty dict if this is an initial sync) current (dict): state at the end of the timeline + lazy_load_members (bool): whether to return members from timeline_start + or not. assumes that timeline_start has already been filtered to + include only the members the client needs to know about. Returns: dict @@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): } c_ids = set(e for e in current.values()) - tc_ids = set(e for e in timeline_contains.values()) - p_ids = set(e for e in previous.values()) ts_ids = set(e for e in timeline_start.values()) + p_ids = set(e for e in previous.values()) + tc_ids = set(e for e in timeline_contains.values()) + + # If we are lazyloading room members, we explicitly add the membership events + # for the senders in the timeline into the state block returned by /sync, + # as we may not have sent them to the client before. We find these membership + # events by filtering them out of timeline_start, which has already been filtered + # to only include membership events for the senders in the timeline. + # In practice, we can do this by removing them from the p_ids list, + # which is the list of relevant state we know we have already sent to the client. + # see https://github.com/matrix-org/synapse/pull/2970 + # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 + + if lazy_load_members: + p_ids.difference_update( + e for t, e in timeline_start.iteritems() + if t[0] == EventTypes.Member + ) state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids diff --git a/synapse/http/client.py b/synapse/http/client.py index d6a0d75b2b..25b6307884 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE from twisted.internet import defer, protocol, reactor, ssl, task from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent -from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer from twisted.web.client import ( + Agent, + BrowserLikeRedirectAgent, + ContentDecoderAgent, + FileBodyProducer as TwistedFileBodyProducer, GzipDecoder, HTTPConnectionPool, PartialDownloadError, diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index f24b4b949c..588e280571 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -38,7 +38,8 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"] + "synapse_http_server_response_time_seconds", "sec", + ["method", "servlet", "tag", "code"], ) response_ru_utime = Counter( @@ -171,11 +172,13 @@ class RequestMetrics(object): ) return - outgoing_responses_counter.labels(request.method, str(request.code)).inc() + response_code = str(request.code) + + outgoing_responses_counter.labels(request.method, response_code).inc() response_count.labels(request.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag).observe( + response_timer.labels(request.method, self.name, tag, response_code).observe( time_sec - self.start ) diff --git a/synapse/http/site.py b/synapse/http/site.py index 41dd974cea..5fd30a4c2c 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -42,9 +42,10 @@ class SynapseRequest(Request): which is handling the request, and returns a context manager. """ - def __init__(self, site, *args, **kw): - Request.__init__(self, *args, **kw) + def __init__(self, site, channel, *args, **kw): + Request.__init__(self, channel, *args, **kw) self.site = site + self._channel = channel self.authenticated_entity = None self.start_time = 0 diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py new file mode 100644 index 0000000000..ce678d5f75 --- /dev/null +++ b/synapse/metrics/background_process_metrics.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily + +from twisted.internet import defer + +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext + +_background_process_start_count = Counter( + "synapse_background_process_start_count", + "Number of background processes started", + ["name"], +) + +# we set registry=None in all of these to stop them getting registered with +# the default registry. Instead we collect them all via the CustomCollector, +# which ensures that we can update them before they are collected. +# +_background_process_ru_utime = Counter( + "synapse_background_process_ru_utime_seconds", + "User CPU time used by background processes, in seconds", + ["name"], + registry=None, +) + +_background_process_ru_stime = Counter( + "synapse_background_process_ru_stime_seconds", + "System CPU time used by background processes, in seconds", + ["name"], + registry=None, +) + +_background_process_db_txn_count = Counter( + "synapse_background_process_db_txn_count", + "Number of database transactions done by background processes", + ["name"], + registry=None, +) + +_background_process_db_txn_duration = Counter( + "synapse_background_process_db_txn_duration_seconds", + ("Seconds spent by background processes waiting for database " + "transactions, excluding scheduling time"), + ["name"], + registry=None, +) + +_background_process_db_sched_duration = Counter( + "synapse_background_process_db_sched_duration_seconds", + "Seconds spent by background processes waiting for database connections", + ["name"], + registry=None, +) + +# map from description to a counter, so that we can name our logcontexts +# incrementally. (It actually duplicates _background_process_start_count, but +# it's much simpler to do so than to try to combine them.) +_background_process_counts = dict() # type: dict[str, int] + +# map from description to the currently running background processes. +# +# it's kept as a dict of sets rather than a big set so that we can keep track +# of process descriptions that no longer have any active processes. +_background_processes = dict() # type: dict[str, set[_BackgroundProcess]] + + +class _Collector(object): + """A custom metrics collector for the background process metrics. + + Ensures that all of the metrics are up-to-date with any in-flight processes + before they are returned. + """ + def collect(self): + background_process_in_flight_count = GaugeMetricFamily( + "synapse_background_process_in_flight_count", + "Number of background processes in flight", + labels=["name"], + ) + + for desc, processes in six.iteritems(_background_processes): + background_process_in_flight_count.add_metric( + (desc,), len(processes), + ) + for process in processes: + process.update_metrics() + + yield background_process_in_flight_count + + # now we need to run collect() over each of the static Counters, and + # yield each metric they return. + for m in ( + _background_process_ru_utime, + _background_process_ru_stime, + _background_process_db_txn_count, + _background_process_db_txn_duration, + _background_process_db_sched_duration, + ): + for r in m.collect(): + yield r + + +REGISTRY.register(_Collector()) + + +class _BackgroundProcess(object): + def __init__(self, desc, ctx): + self.desc = desc + self._context = ctx + self._reported_stats = None + + def update_metrics(self): + """Updates the metrics with values from this process.""" + new_stats = self._context.get_resource_usage() + if self._reported_stats is None: + diff = new_stats + else: + diff = new_stats - self._reported_stats + self._reported_stats = new_stats + + _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime) + _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime) + _background_process_db_txn_count.labels(self.desc).inc( + diff.db_txn_count, + ) + _background_process_db_txn_duration.labels(self.desc).inc( + diff.db_txn_duration_sec, + ) + _background_process_db_sched_duration.labels(self.desc).inc( + diff.db_sched_duration_sec, + ) + + +def run_as_background_process(desc, func, *args, **kwargs): + """Run the given function in its own logcontext, with resource metrics + + This should be used to wrap processes which are fired off to run in the + background, instead of being associated with a particular request. + + It returns a Deferred which completes when the function completes, but it doesn't + follow the synapse logcontext rules, which makes it appropriate for passing to + clock.looping_call and friends (or for firing-and-forgetting in the middle of a + normal synapse inlineCallbacks function). + + Args: + desc (str): a description for this background process type + func: a function, which may return a Deferred + args: positional args for func + kwargs: keyword args for func + + Returns: Deferred which returns the result of func, but note that it does not + follow the synapse logcontext rules. + """ + @defer.inlineCallbacks + def run(): + count = _background_process_counts.get(desc, 0) + _background_process_counts[desc] = count + 1 + _background_process_start_count.labels(desc).inc() + + with LoggingContext(desc) as context: + context.request = "%s-%i" % (desc, count) + proc = _BackgroundProcess(desc, context) + _background_processes.setdefault(desc, set()).add(proc) + try: + yield func(*args, **kwargs) + finally: + proc.update_metrics() + _background_processes[desc].remove(proc) + + with PreserveLoggingContext(): + return run() diff --git a/synapse/notifier.py b/synapse/notifier.py index 51cbd66f06..e650c3e494 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -274,7 +274,7 @@ class Notifier(object): logger.exception("Error notifying application services of event") def on_new_event(self, stream_key, new_token, users=[], rooms=[]): - """ Used to inform listeners that something has happend event wise. + """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. """ diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index bb181d94ee..1d14d3639c 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object): @defer.inlineCallbacks def _get_power_levels_and_sender_level(self, event, context): - pl_event_id = context.prev_state_ids.get(POWER_KEY) + prev_state_ids = yield context.get_prev_state_ids(self.store) + pl_event_id = prev_state_ids.get(POWER_KEY) if pl_event_id: # fastpath: if there's a power level event, that's all we need, and # not having a power level event is an extreme edge case @@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object): auth_events = {POWER_KEY: pl_event} else: auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=False, + event, prev_state_ids, for_verification=False, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -304,7 +305,7 @@ class RulesForRoom(object): push_rules_delta_state_cache_metric.inc_hits() else: - current_state_ids = context.current_state_ids + current_state_ids = yield context.get_current_state_ids(self.store) push_rules_delta_state_cache_metric.inc_misses() push_rules_state_size_counter.inc(len(current_state_ids)) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 2eede54792..5227bc333d 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -34,12 +34,13 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def send_event_to_master(clock, client, host, port, requester, event, context, +def send_event_to_master(clock, store, client, host, port, requester, event, context, ratelimit, extra_users): """Send event to be handled on the master Args: clock (synapse.util.Clock) + store (DataStore) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication @@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context, host, port, event.event_id, ) + serialized_context = yield context.serialize(event, store) + payload = { "event": event.get_pdu_json(), "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, - "context": context.serialize(event), + "context": serialized_context, "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e592ab57bf..970e94313e 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -192,7 +192,7 @@ class ReplicationClientHandler(object): """Returns a deferred that is resolved when we receive a SYNC command with given data. - Used by tests. + [Not currently] used by tests. """ return self.awaiting_syncs.setdefault(data, defer.Deferred()) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 611fb66e1d..fd59f1595f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -25,6 +25,7 @@ from twisted.internet import defer from twisted.internet.protocol import Factory from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.metrics import Measure, measure_func from .protocol import ServerReplicationStreamProtocol @@ -117,7 +118,6 @@ class ReplicationStreamer(object): for conn in self.connections: conn.send_error("server shutting down") - @defer.inlineCallbacks def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the connections if there are. @@ -132,14 +132,16 @@ class ReplicationStreamer(object): stream.discard_updates_and_advance() return - # If we're in the process of checking for new updates, mark that fact - # and return + self.pending_updates = True + if self.is_looping: - logger.debug("Noitifier poke loop already running") - self.pending_updates = True + logger.debug("Notifier poke loop already running") return - self.pending_updates = True + run_as_background_process("replication_notifier", self._run_notifier_loop) + + @defer.inlineCallbacks + def _run_notifier_loop(self): self.is_looping = True try: diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 75c2a4ec8e..3418f06fd6 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +14,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import PY3 + from synapse.http.server import JsonResource from synapse.rest.client import versions -from synapse.rest.client.v1 import admin, directory, events, initial_sync -from synapse.rest.client.v1 import login as v1_login -from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher -from synapse.rest.client.v1 import register as v1_register -from synapse.rest.client.v1 import room, voip +from synapse.rest.client.v1 import ( + admin, + directory, + events, + initial_sync, + login as v1_login, + logout, + presence, + profile, + push_rule, + pusher, + room, + voip, +) from synapse.rest.client.v2_alpha import ( account, account_data, @@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import ( user_directory, ) +if not PY3: + from synapse.rest.client.v1_only import ( + register as v1_register, + ) + class ClientRestResource(JsonResource): """A resource for version 1 of the matrix client API.""" @@ -54,14 +71,22 @@ class ClientRestResource(JsonResource): def register_servlets(client_resource, hs): versions.register_servlets(client_resource) - # "v1" - room.register_servlets(hs, client_resource) + if not PY3: + # "v1" (Python 2 only) + v1_register.register_servlets(hs, client_resource) + + # Deprecated in r0 + initial_sync.register_servlets(hs, client_resource) + room.register_deprecated_servlets(hs, client_resource) + + # Partially deprecated in r0 events.register_servlets(hs, client_resource) - v1_register.register_servlets(hs, client_resource) + + # "v1" + "r0" + room.register_servlets(hs, client_resource) v1_login.register_servlets(hs, client_resource) profile.register_servlets(hs, client_resource) presence.register_servlets(hs, client_resource) - initial_sync.register_servlets(hs, client_resource) directory.register_servlets(hs, client_resource) voip.register_servlets(hs, client_resource) admin.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 2dc50e582b..99f6c6e3c3 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import hashlib +import hmac import logging from six.moves import http_client @@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet): defer.returnValue((200, ret)) +class UserRegisterServlet(ClientV1RestServlet): + """ + Attributes: + NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted + nonces (dict[str, int]): The nonces that we will accept. A dict of + nonce to the time it was generated, in int seconds. + """ + PATTERNS = client_path_patterns("/admin/register") + NONCE_TIMEOUT = 60 + + def __init__(self, hs): + super(UserRegisterServlet, self).__init__(hs) + self.handlers = hs.get_handlers() + self.reactor = hs.get_reactor() + self.nonces = {} + self.hs = hs + + def _clear_old_nonces(self): + """ + Clear out old nonces that are older than NONCE_TIMEOUT. + """ + now = int(self.reactor.seconds()) + + for k, v in list(self.nonces.items()): + if now - v > self.NONCE_TIMEOUT: + del self.nonces[k] + + def on_GET(self, request): + """ + Generate a new nonce. + """ + self._clear_old_nonces() + + nonce = self.hs.get_secrets().token_hex(64) + self.nonces[nonce] = int(self.reactor.seconds()) + return (200, {"nonce": nonce.encode('ascii')}) + + @defer.inlineCallbacks + def on_POST(self, request): + self._clear_old_nonces() + + if not self.hs.config.registration_shared_secret: + raise SynapseError(400, "Shared secret registration is not enabled") + + body = parse_json_object_from_request(request) + + if "nonce" not in body: + raise SynapseError( + 400, "nonce must be specified", errcode=Codes.BAD_JSON, + ) + + nonce = body["nonce"] + + if nonce not in self.nonces: + raise SynapseError( + 400, "unrecognised nonce", + ) + + # Delete the nonce, so it can't be reused, even if it's invalid + del self.nonces[nonce] + + if "username" not in body: + raise SynapseError( + 400, "username must be specified", errcode=Codes.BAD_JSON, + ) + else: + if (not isinstance(body['username'], str) or len(body['username']) > 512): + raise SynapseError(400, "Invalid username") + + username = body["username"].encode("utf-8") + if b"\x00" in username: + raise SynapseError(400, "Invalid username") + + if "password" not in body: + raise SynapseError( + 400, "password must be specified", errcode=Codes.BAD_JSON, + ) + else: + if (not isinstance(body['password'], str) or len(body['password']) > 512): + raise SynapseError(400, "Invalid password") + + password = body["password"].encode("utf-8") + if b"\x00" in password: + raise SynapseError(400, "Invalid password") + + admin = body.get("admin", None) + got_mac = body["mac"] + + want_mac = hmac.new( + key=self.hs.config.registration_shared_secret.encode(), + digestmod=hashlib.sha1, + ) + want_mac.update(nonce) + want_mac.update(b"\x00") + want_mac.update(username) + want_mac.update(b"\x00") + want_mac.update(password) + want_mac.update(b"\x00") + want_mac.update(b"admin" if admin else b"notadmin") + want_mac = want_mac.hexdigest() + + if not hmac.compare_digest(want_mac, got_mac): + raise SynapseError( + 403, "HMAC incorrect", + ) + + # Reuse the parts of RegisterRestServlet to reduce code duplication + from synapse.rest.client.v2_alpha.register import RegisterRestServlet + register = RegisterRestServlet(self.hs) + + (user_id, _) = yield register.registration_handler.register( + localpart=username.lower(), password=password, admin=bool(admin), + generate_token=False, + ) + + result = yield register._create_registration_details(user_id, body) + defer.returnValue((200, result)) + + class WhoisRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)") @@ -123,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): hs (synapse.server.HomeServer) """ super(PurgeHistoryRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() self.store = hs.get_datastore() @defer.inlineCallbacks @@ -198,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): errcode=Codes.BAD_JSON, ) - purge_id = yield self.handlers.message_handler.start_purge_history( + purge_id = yield self.pagination_handler.start_purge_history( room_id, token, delete_local_events=delete_local_events, ) @@ -220,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet): hs (synapse.server.HomeServer) """ super(PurgeHistoryStatusRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() @defer.inlineCallbacks def on_GET(self, request, purge_id): @@ -230,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - purge_status = self.handlers.message_handler.get_purge_status(purge_id) + purge_status = self.pagination_handler.get_purge_status(purge_id) if purge_status is None: raise NotFoundError("purge id '%s' not found" % purge_id) @@ -614,3 +735,4 @@ def register_servlets(hs, http_server): ShutdownRoomRestServlet(hs).register(http_server) QuarantineMediaInRoom(hs).register(http_server) ListMediaInRoom(hs).register(http_server) + UserRegisterServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3d62447854..13c331550b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): self.handlers = hs.get_handlers() self.event_creation_hander = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() + self.message_handler = hs.get_message_handler() def register(self, http_server): # /room/$roomid/state/$eventtype @@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): format = parse_string(request, "format", default="content", allowed_values=["content", "event"]) - msg_handler = self.handlers.message_handler + msg_handler = self.message_handler data = yield msg_handler.get_room_data( user_id=requester.user.to_string(), room_id=room_id, @@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomMemberListRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): # TODO support Pagination stream API (limit/tokens) requester = yield self.auth.get_user_by_req(request) - handler = self.handlers.message_handler - events = yield handler.get_state_events( + events = yield self.message_handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), ) @@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(JoinedRoomMemberListRestServlet, self).__init__(hs) - self.message_handler = hs.get_handlers().message_handler + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomMessageListRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.pagination_handler = hs.get_pagination_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): @@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet): event_filter = Filter(json.loads(filter_json)) else: event_filter = None - handler = self.handlers.message_handler - msgs = yield handler.get_messages( + msgs = yield self.pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomStateRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.message_handler = hs.get_message_handler() @defer.inlineCallbacks def on_GET(self, request, room_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) - handler = self.handlers.message_handler # Get all the current state for this room - events = yield handler.get_state_events( + events = yield self.message_handler.get_state_events( room_id=room_id, user_id=requester.user.to_string(), is_guest=requester.is_guest, @@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet): def __init__(self, hs): super(RoomEventContextServlet, self).__init__(hs) self.clock = hs.get_clock() - self.handlers = hs.get_handlers() + self.room_context_handler = hs.get_room_context_handler() @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): @@ -533,11 +531,20 @@ class RoomEventContextServlet(ClientV1RestServlet): limit = parse_integer(request, "limit", default=10) - results = yield self.handlers.room_context_handler.get_event_context( + # picking the API shape for symmetry with /messages + filter_bytes = parse_string(request, "filter") + if filter_bytes: + filter_json = urlparse.unquote(filter_bytes).decode("UTF-8") + event_filter = Filter(json.loads(filter_json)) + else: + event_filter = None + + results = yield self.room_context_handler.get_event_context( requester.user, room_id, event_id, limit, + event_filter, ) if not results: @@ -832,10 +839,13 @@ def register_servlets(hs, http_server): RoomSendEventRestServlet(hs).register(http_server) PublicRoomListRestServlet(hs).register(http_server) RoomStateRestServlet(hs).register(http_server) - RoomInitialSyncRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) JoinedRoomsRestServlet(hs).register(http_server) RoomEventServlet(hs).register(http_server) RoomEventContextServlet(hs).register(http_server) + + +def register_deprecated_servlets(hs, http_server): + RoomInitialSyncRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1_only/__init__.py b/synapse/rest/client/v1_only/__init__.py new file mode 100644 index 0000000000..936f902ace --- /dev/null +++ b/synapse/rest/client/v1_only/__init__.py @@ -0,0 +1,3 @@ +""" +REST APIs that are only used in v1 (the legacy API). +""" diff --git a/synapse/rest/client/v1_only/base.py b/synapse/rest/client/v1_only/base.py new file mode 100644 index 0000000000..9d4db7437c --- /dev/null +++ b/synapse/rest/client/v1_only/base.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module contains base REST classes for constructing client v1 servlets. +""" + +import re + +from synapse.api.urls import CLIENT_PREFIX + + +def v1_only_client_path_patterns(path_regex, include_in_unstable=True): + """Creates a regex compiled client path with the correct client path + prefix. + + Args: + path_regex (str): The regex string to match. This should NOT have a ^ + as this will be prefixed. + Returns: + list of SRE_Pattern + """ + patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)] + if include_in_unstable: + unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable") + patterns.append(re.compile("^" + unstable_prefix + path_regex)) + return patterns diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1_only/register.py index 25a143af8d..3439c3c6d4 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1_only/register.py @@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils from synapse.api.constants import LoginType from synapse.api.errors import Codes, SynapseError from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request +from synapse.rest.client.v1.base import ClientV1RestServlet from synapse.types import create_requester -from .base import ClientV1RestServlet, client_path_patterns +from .base import v1_only_client_path_patterns logger = logging.getLogger(__name__) @@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet): handler doesn't have a concept of multi-stages or sessions. """ - PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False) + PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False) def __init__(self, hs): """ @@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet): """Handles user creation via a server-to-server interface """ - PATTERNS = client_path_patterns("/createUser$", releases=()) + PATTERNS = v1_only_client_path_patterns("/createUser$") def __init__(self, hs): super(CreateUserRestServlet, self).__init__(hs) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 1918897f68..d6cf915d86 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -643,7 +643,7 @@ class RegisterRestServlet(RestServlet): @defer.inlineCallbacks def _do_guest_registration(self, params): if not self.hs.config.allow_guest_access: - defer.returnValue((403, "Guest access is disabled")) + raise SynapseError(403, "Guest access is disabled") user_id, _ = yield self.registration_handler.register( generate_token=False, make_guest=True diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 30242c525a..174ad20123 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -35,6 +35,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.async import Linearizer from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination @@ -100,10 +101,15 @@ class MediaRepository(object): ) self.clock.looping_call( - self._update_recently_accessed, + self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS, ) + def _start_update_recently_accessed(self): + return run_as_background_process( + "update_recently_accessed_media", self._update_recently_accessed, + ) + @defer.inlineCallbacks def _update_recently_accessed(self): remote_media = self.recently_accessed_remotes diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index b70b15c4c2..27aa0def2f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -41,6 +41,7 @@ from synapse.http.server import ( wrap_json_request_handler, ) from synapse.http.servlet import parse_integer, parse_string +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.async import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -81,7 +82,7 @@ class PreviewUrlResource(Resource): self._cache.start() self._cleaner_loop = self.clock.looping_call( - self._expire_url_cache_data, 10 * 1000 + self._start_expire_url_cache_data, 10 * 1000, ) def render_OPTIONS(self, request): @@ -371,6 +372,11 @@ class PreviewUrlResource(Resource): "etag": headers["ETag"][0] if "ETag" in headers else None, }) + def _start_expire_url_cache_data(self): + return run_as_background_process( + "expire_url_cache_data", self._expire_url_cache_data, + ) + @defer.inlineCallbacks def _expire_url_cache_data(self): """Clean up expired url cache content, media and thumbnails. diff --git a/synapse/secrets.py b/synapse/secrets.py new file mode 100644 index 0000000000..f05e9ea535 --- /dev/null +++ b/synapse/secrets.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Injectable secrets module for Synapse. + +See https://docs.python.org/3/library/secrets.html#module-secrets for the API +used in Python 3.6, and the API emulated in Python 2.7. +""" + +import sys + +# secrets is available since python 3.6 +if sys.version_info[0:2] >= (3, 6): + import secrets + + def Secrets(): + return secrets + +else: + import os + import binascii + + class Secrets(object): + def token_bytes(self, nbytes=32): + return os.urandom(nbytes) + + def token_hex(self, nbytes=32): + return binascii.hexlify(self.token_bytes(nbytes)) diff --git a/synapse/server.py b/synapse/server.py index 92bea96c5c..140be9ebe8 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.initial_sync import InitialSyncHandler -from synapse.handlers.message import EventCreationHandler +from synapse.handlers.message import EventCreationHandler, MessageHandler +from synapse.handlers.pagination import PaginationHandler from synapse.handlers.presence import PresenceHandler from synapse.handlers.profile import ProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.receipts import ReceiptsHandler -from synapse.handlers.room import RoomCreationHandler +from synapse.handlers.room import RoomContextHandler, RoomCreationHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler @@ -74,6 +75,7 @@ from synapse.rest.media.v1.media_repository import ( MediaRepository, MediaRepositoryResource, ) +from synapse.secrets import Secrets from synapse.server_notices.server_notices_manager import ServerNoticesManager from synapse.server_notices.server_notices_sender import ServerNoticesSender from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender @@ -158,11 +160,15 @@ class HomeServer(object): 'groups_server_handler', 'groups_attestation_signing', 'groups_attestation_renewer', + 'secrets', 'spam_checker', 'room_member_handler', 'federation_registry', 'server_notices_manager', 'server_notices_sender', + 'message_handler', + 'pagination_handler', + 'room_context_handler', ] def __init__(self, hostname, reactor=None, **kwargs): @@ -405,6 +411,9 @@ class HomeServer(object): def build_groups_attestation_renewer(self): return GroupAttestionRenewer(self) + def build_secrets(self): + return Secrets() + def build_spam_checker(self): return SpamChecker(self) @@ -426,6 +435,15 @@ class HomeServer(object): return WorkerServerNoticesSender(self) return ServerNoticesSender(self) + def build_message_handler(self): + return MessageHandler(self) + + def build_pagination_handler(self): + return PaginationHandler(self) + + def build_room_context_handler(self): + return RoomContextHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/state.py b/synapse/state.py index 15a593d41c..033f55d967 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,7 +18,7 @@ import hashlib import logging from collections import namedtuple -from six import iteritems, itervalues +from six import iteritems, iterkeys, itervalues from frozendict import frozendict @@ -203,25 +203,27 @@ class StateHandler(object): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. - context = EventContext() if old_state: - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): - context.current_state_ids = dict(context.prev_state_ids) + current_state_ids = dict(prev_state_ids) key = (event.type, event.state_key) - context.current_state_ids[key] = event.event_id + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids else: - context.current_state_ids = {} - context.prev_state_ids = {} - context.prev_state_events = [] + current_state_ids = {} + prev_state_ids = {} # We don't store state for outliers, so we don't generate a state - # froup for it. - context.state_group = None + # group for it. + context = EventContext.with_state( + state_group=None, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + ) defer.returnValue(context) @@ -230,31 +232,35 @@ class StateHandler(object): # Let's just correctly fill out the context and create a # new state group for it. - context = EventContext() - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] if replaces != event.event_id: # Paranoia check event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=None, delta_ids=None, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, + ) + + context = EventContext.with_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, ) - context.prev_state_events = [] defer.returnValue(context) logger.debug("calling resolve_state_groups from compute_event_context") @@ -262,47 +268,47 @@ class StateHandler(object): event.room_id, [e for e, _ in event.prev_events], ) - curr_state = entry.state + prev_state_ids = entry.state + prev_group = None + delta_ids = None - context = EventContext() - context.prev_state_ids = curr_state if event.is_state(): # If this is a state event then we need to create a new state # group for the state after this event. key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id if entry.state_group: # If the state at the event has a state group assigned then # we can use that as the prev group - context.prev_group = entry.state_group - context.delta_ids = { + prev_group = entry.state_group + delta_ids = { key: event.event_id } elif entry.prev_group: # If the state at the event only has a prev group, then we can # use that as a prev group too. - context.prev_group = entry.prev_group - context.delta_ids = dict(entry.delta_ids) - context.delta_ids[key] = event.event_id + prev_group = entry.prev_group + delta_ids = dict(entry.delta_ids) + delta_ids[key] = event.event_id - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + current_state_ids=current_state_ids, ) else: - context.current_state_ids = context.prev_state_ids - context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids + current_state_ids = prev_state_ids + prev_group = entry.prev_group + delta_ids = entry.delta_ids if entry.state_group is None: entry.state_group = yield self.store.store_state_group( @@ -310,13 +316,20 @@ class StateHandler(object): event.room_id, prev_group=entry.prev_group, delta_ids=entry.delta_ids, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, ) entry.state_id = entry.state_group - context.state_group = entry.state_group + state_group = entry.state_group + + context = EventContext.with_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + ) - context.prev_state_events = [] defer.returnValue(context) @defer.inlineCallbacks @@ -458,69 +471,39 @@ class StateResolutionHandler(object): "Resolving state for %s with %d groups", room_id, len(state_groups_ids) ) - # build a map from state key to the event_ids which set that state. - # dict[(str, str), set[str]) - state = {} + # start by assuming we won't have any conflicted state, and build up the new + # state map by iterating through the state groups. If we discover a conflict, + # we give up and instead use `resolve_events_with_factory`. + # + # XXX: is this actually worthwhile, or should we just let + # resolve_events_with_factory do it? + new_state = {} + conflicted_state = False for st in itervalues(state_groups_ids): for key, e_id in iteritems(st): - state.setdefault(key, set()).add(e_id) - - # build a map from state key to the event_ids which set that state, - # including only those where there are state keys in conflict. - conflicted_state = { - k: list(v) - for k, v in iteritems(state) - if len(v) > 1 - } + if key in new_state: + conflicted_state = True + break + new_state[key] = e_id + if conflicted_state: + break if conflicted_state: logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_factory( - list(state_groups_ids.values()), + list(itervalues(state_groups_ids)), event_map=event_map, state_map_factory=state_map_factory, ) - else: - new_state = { - key: e_ids.pop() for key, e_ids in iteritems(state) - } - with Measure(self.clock, "state.create_group_ids"): - # if the new state matches any of the input state groups, we can - # use that state group again. Otherwise we will generate a state_id - # which will be used as a cache key for future resolutions, but - # not get persisted. - state_group = None - new_state_event_ids = frozenset(itervalues(new_state)) - for sg, events in iteritems(state_groups_ids): - if new_state_event_ids == frozenset(e_id for e_id in events): - state_group = sg - break + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. - # TODO: We want to create a state group for this set of events, to - # increase cache hits, but we need to make sure that it doesn't - # end up as a prev_group without being added to the database - - prev_group = None - delta_ids = None - for old_group, old_ids in iteritems(state_groups_ids): - if not set(new_state) - set(old_ids): - n_delta_ids = { - k: v - for k, v in iteritems(new_state) - if old_ids.get(k) != v - } - if not delta_ids or len(n_delta_ids) < len(delta_ids): - prev_group = old_group - delta_ids = n_delta_ids - - cache = _StateCacheEntry( - state=new_state, - state_group=state_group, - prev_group=prev_group, - delta_ids=delta_ids, - ) + with Measure(self.clock, "state.create_group_ids"): + cache = _make_state_cache_entry(new_state, state_groups_ids) if self._state_cache is not None: self._state_cache[group_names] = cache @@ -528,6 +511,70 @@ class StateResolutionHandler(object): defer.returnValue(cache) +def _make_state_cache_entry( + new_state, + state_groups_ids, +): + """Given a resolved state, and a set of input state groups, pick one to base + a new state group on (if any), and return an appropriately-constructed + _StateCacheEntry. + + Args: + new_state (dict[(str, str), str]): resolved state map (mapping from + (type, state_key) to event_id) + + state_groups_ids (dict[int, dict[(str, str), str]]): + map from state group id to the state in that state group + (where 'state' is a map from state key to event id) + + Returns: + _StateCacheEntry + """ + # if the new state matches any of the input state groups, we can + # use that state group again. Otherwise we will generate a state_id + # which will be used as a cache key for future resolutions, but + # not get persisted. + + # first look for exact matches + new_state_event_ids = set(itervalues(new_state)) + for sg, state in iteritems(state_groups_ids): + if len(new_state_event_ids) != len(state): + continue + + old_state_event_ids = set(itervalues(state)) + if new_state_event_ids == old_state_event_ids: + # got an exact match. + return _StateCacheEntry( + state=new_state, + state_group=sg, + ) + + # TODO: We want to create a state group for this set of events, to + # increase cache hits, but we need to make sure that it doesn't + # end up as a prev_group without being added to the database + + # failing that, look for the closest match. + prev_group = None + delta_ids = None + + for old_group, old_state in iteritems(state_groups_ids): + n_delta_ids = { + k: v + for k, v in iteritems(new_state) + if old_state.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + + return _StateCacheEntry( + state=new_state, + state_group=None, + prev_group=prev_group, + delta_ids=delta_ids, + ) + + def _ordered_events(events): def key_func(e): return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest() @@ -569,7 +616,7 @@ def _seperate(state_sets): with them in different state sets. Args: - state_sets(list[dict[(str, str), str]]): + state_sets(iterable[dict[(str, str), str]]): List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. @@ -583,10 +630,11 @@ def _seperate(state_sets): conflicted_state is a dict mapping (type, state_key) to a set of event ids for conflicted state keys. """ - unconflicted_state = dict(state_sets[0]) + state_set_iterator = iter(state_sets) + unconflicted_state = dict(next(state_set_iterator)) conflicted_state = {} - for state_set in state_sets[1:]: + for state_set in state_set_iterator: for key, value in iteritems(state_set): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) @@ -647,7 +695,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): for event_id in event_ids ) if event_map is not None: - needed_events -= set(event_map.iterkeys()) + needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d conflicted events", len(needed_events)) @@ -668,7 +716,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events if event_map is not None: - new_needed_events -= set(event_map.iterkeys()) + new_needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d auth events", len(new_needed_events)) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index ba88a54979..3bd63cd195 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -66,6 +66,7 @@ class DataStore(RoomMemberStore, RoomStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, ApplicationServiceStore, + EventsStore, EventFederationStore, MediaRepositoryStore, RejectionsStore, @@ -73,7 +74,6 @@ class DataStore(RoomMemberStore, RoomStore, PusherStore, PushRuleStore, ApplicationServiceTransactionStore, - EventsStore, ReceiptsStore, EndToEndKeyStore, SearchStore, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 98dde77431..44f37b4c1e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -311,6 +311,12 @@ class SQLBaseStore(object): after_callbacks = [] exception_callbacks = [] + if LoggingContext.current_context() == LoggingContext.sentinel: + logger.warn( + "Starting db txn '%s' from sentinel context", + desc, + ) + try: result = yield self.runWithConnection( self._new_transaction, @@ -344,7 +350,7 @@ class SQLBaseStore(object): parent_context = LoggingContext.current_context() if parent_context == LoggingContext.sentinel: logger.warn( - "Running db txn from sentinel context: metrics will be lost", + "Starting db connection from sentinel context: metrics will be lost", ) parent_context = None diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 9f12b360bc..31248d5e06 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.appservice import AppServiceTransaction from synapse.config.appservice import load_appservices -from synapse.storage.events import EventsWorkerStore +from synapse.storage.events_worker import EventsWorkerStore from ._base import SQLBaseStore diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index dc9eca7d15..5fe1ca2de7 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -19,6 +19,8 @@ from canonicaljson import json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process + from . import engines from ._base import SQLBaseStore @@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers = {} self._all_done = False - @defer.inlineCallbacks def start_doing_background_updates(self): - logger.info("Starting background schema updates") + run_as_background_process( + "background_updates", self._run_background_updates, + ) + @defer.inlineCallbacks + def _run_background_updates(self): + logger.info("Starting background schema updates") while True: yield self.hs.get_clock().sleep( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index b78eda3413..b8cefd43d6 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -19,6 +19,7 @@ from six import iteritems from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import CACHE_SIZE_FACTOR from . import background_updates @@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._batch_row_update[key] = (user_agent, device_id, now) def _update_client_ips_batch(self): - to_update = self._batch_row_update - self._batch_row_update = {} - return self.runInteraction( - "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update + def update(): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, + to_update, + ) + + return run_as_background_process( + "update_client_ips", update, ) def _update_client_ips_batch_txn(self, txn, to_update): diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index ec68e39f1e..c0943ecf91 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from ._base import Cache, SQLBaseStore @@ -248,17 +249,31 @@ class DeviceStore(SQLBaseStore): def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id, content, stream_id): - self._simple_upsert_txn( - txn, - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - values={ - "content": json.dumps(content), - } - ) + if content.get("deleted"): + self._simple_delete_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + ) + + txn.call_after( + self.device_id_exists_cache.invalidate, (user_id, device_id,) + ) + else: + self._simple_upsert_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "content": json.dumps(content), + } + ) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,)) txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) @@ -366,7 +381,7 @@ class DeviceStore(SQLBaseStore): now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( - txn, query_map.keys(), include_all_devices=True + txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True ) prev_sent_id_sql = """ @@ -393,12 +408,15 @@ class DeviceStore(SQLBaseStore): prev_id = stream_id - key_json = device.get("key_json", None) - if key_json: - result["keys"] = json.loads(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name + if device is not None: + key_json = device.get("key_json", None) + if key_json: + result["keys"] = json.loads(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + else: + result["deleted"] = True results.append(result) @@ -694,6 +712,9 @@ class DeviceStore(SQLBaseStore): logger.info("Pruned %d device list outbound pokes", txn.rowcount) - return self.runInteraction( - "_prune_old_outbound_device_pokes", _prune_txn + return run_as_background_process( + "prune_old_outbound_device_pokes", + self.runInteraction, + "_prune_old_outbound_device_pokes", + _prune_txn, ) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 7ae5c65482..523b4360c3 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -64,12 +64,18 @@ class EndToEndKeyStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_e2e_device_keys(self, query_list, include_all_devices=False): + def get_e2e_device_keys( + self, query_list, include_all_devices=False, + include_deleted_devices=False, + ): """Fetch a list of device keys. Args: query_list(list): List of pairs of user_ids and device_ids. include_all_devices (bool): whether to include entries for devices that don't have device keys + include_deleted_devices (bool): whether to include null entries for + devices which no longer exist (but were in the query_list). + This option only takes effect if include_all_devices is true. Returns: Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". @@ -79,7 +85,7 @@ class EndToEndKeyStore(SQLBaseStore): results = yield self.runInteraction( "get_e2e_device_keys", self._get_e2e_device_keys_txn, - query_list, include_all_devices, + query_list, include_all_devices, include_deleted_devices, ) for user_id, device_keys in iteritems(results): @@ -88,10 +94,19 @@ class EndToEndKeyStore(SQLBaseStore): defer.returnValue(results) - def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices): + def _get_e2e_device_keys_txn( + self, txn, query_list, include_all_devices=False, + include_deleted_devices=False, + ): query_clauses = [] query_params = [] + if include_all_devices is False: + include_deleted_devices = False + + if include_deleted_devices: + deleted_devices = set(query_list) + for (user_id, device_id) in query_list: query_clause = "user_id = ?" query_params.append(user_id) @@ -119,8 +134,14 @@ class EndToEndKeyStore(SQLBaseStore): result = {} for row in rows: + if include_deleted_devices: + deleted_devices.remove((row["user_id"], row["device_id"])) result.setdefault(row["user_id"], {})[row["device_id"]] = row + if include_deleted_devices: + for user_id, device_id in deleted_devices: + result.setdefault(user_id, {})[device_id] = None + return result @defer.inlineCallbacks diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8d366d1b91..8bd35df119 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -23,8 +23,9 @@ from unpaddedbase64 import encode_base64 from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore -from synapse.storage.events import EventsWorkerStore +from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.signatures import SignatureWorkerStore from synapse.util.caches.descriptors import cached @@ -113,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, sql = ( "SELECT b.event_id, MAX(e.depth) FROM events as e" " INNER JOIN event_edges as g" - " ON g.event_id = e.event_id AND g.room_id = e.room_id" + " ON g.event_id = e.event_id" " INNER JOIN event_backward_extremities as b" - " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id" + " ON g.prev_event_id = b.event_id" " WHERE b.room_id = ? AND g.is_state is ?" " GROUP BY b.event_id" ) @@ -329,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, "SELECT depth, prev_event_id FROM event_edges" " INNER JOIN events" " ON prev_event_id = events.event_id" - " AND event_edges.room_id = events.room_id" - " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " WHERE event_edges.event_id = ?" " AND event_edges.is_state = ?" " LIMIT ?" ) @@ -364,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, txn.execute( query, - (room_id, event_id, False, limit - len(event_results)) + (event_id, False, limit - len(event_results)) ) for row in txn: @@ -401,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, query = ( "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? AND is_state = ? " + "WHERE event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -410,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, for event_id in front: txn.execute( query, - (room_id, event_id, False, limit - len(event_results)) + (event_id, False, limit - len(event_results)) ) for e_id, in txn: @@ -446,7 +446,7 @@ class EventFederationStore(EventFederationWorkerStore): ) hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + self._delete_old_forward_extrem_cache, 60 * 60 * 1000, ) def _update_min_depth_for_room_txn(self, txn, room_id, depth): @@ -548,9 +548,11 @@ class EventFederationStore(EventFederationWorkerStore): sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) ) - return self.runInteraction( + return run_as_background_process( + "delete_old_forward_extrem_cache", + self.runInteraction, "_delete_old_forward_extrem_cache", - _delete_old_forward_extrem_cache_txn + _delete_old_forward_extrem_cache_txn, ) def clean_room_for_join(self, room_id): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 29b511ae5e..6840320641 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks @@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): "Error removing push actions after event persistence failure", ) - @defer.inlineCallbacks def _find_stream_orderings_for_times(self): - yield self.runInteraction( + return run_as_background_process( + "event_push_action_stream_orderings", + self.runInteraction, "_find_stream_orderings_for_times", - self._find_stream_orderings_for_times_txn + self._find_stream_orderings_for_times_txn, ) def _find_stream_orderings_for_times_txn(self, txn): @@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): self._doing_notif_rotation = False self._rotate_notif_loop = self._clock.looping_call( - self._rotate_notifs, 30 * 60 * 1000 + self._start_rotate_notifs, 30 * 60 * 1000, ) def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, @@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? """, (room_id, user_id, stream_ordering)) + def _start_rotate_notifs(self): + return run_as_background_process("rotate_notifs", self._rotate_notifs) + @defer.inlineCallbacks def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2aaab0d02c..c98e524ba1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ import logging from collections import OrderedDict, deque, namedtuple from functools import wraps -from six import iteritems, itervalues +from six import iteritems from six.moves import range from canonicaljson import json @@ -33,6 +33,9 @@ from synapse.api.errors import SynapseError # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.background_updates import BackgroundUpdateStore +from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred @@ -141,25 +144,22 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: - # handle_queue_loop runs in the sentinel logcontext, so - # there is no need to preserve_fn when running the - # callbacks on the deferred. try: ret = yield per_item_callback(item) - item.deferred.callback(ret) except Exception: - item.deferred.errback() + with PreserveLoggingContext(): + item.deferred.errback() + else: + with PreserveLoggingContext(): + item.deferred.callback(ret) finally: queue = self._event_persist_queues.pop(room_id, None) if queue: self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - # set handle_queue_loop off on the background. We don't want to - # attribute work done in it to the current request, so we drop the - # logcontext altogether. - with PreserveLoggingContext(): - handle_queue_loop() + # set handle_queue_loop off in the background + run_as_background_process("persist_events", handle_queue_loop) def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) @@ -195,7 +195,9 @@ def _retry_on_integrity_error(func): return f -class EventsStore(EventsWorkerStore): +# inherits from EventFederationStore so that we can call _update_backward_extremities +# and _handle_mult_prev_events (though arguably those could both be moved in here) +class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -345,11 +347,14 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties = {} # map room_id->(type,state_key)->event_id tracking the full - # state in each room after adding these events + # state in each room after adding these events. + # This is simply used to prefill the get_current_state_ids + # cache current_state_for_room = {} - # map room_id->(to_delete, to_insert) where each entry is - # a map (type,key)->event_id giving the state delta in each + # map room_id->(to_delete, to_insert) where to_delete is a list + # of type/state keys to remove from current state, and to_insert + # is a map (type,key)->event_id giving the state delta in each # room state_delta_for_room = {} @@ -419,19 +424,40 @@ class EventsStore(EventsWorkerStore): logger.info( "Calculating state delta for room %s", room_id, ) - current_state = yield self._get_new_state_after_events( - room_id, - ev_ctx_rm, - latest_event_ids, - new_latest_event_ids, - ) + with Measure( + self._clock, + "persist_events.get_new_state_after_events", + ): + res = yield self._get_new_state_after_events( + room_id, + ev_ctx_rm, + latest_event_ids, + new_latest_event_ids, + ) + current_state, delta_ids = res + + # If either are not None then there has been a change, + # and we need to work out the delta (or use that + # given) + if delta_ids is not None: + # If there is a delta we know that we've + # only added or replaced state, never + # removed keys entirely. + state_delta_for_room[room_id] = ([], delta_ids) + elif current_state is not None: + with Measure( + self._clock, + "persist_events.calculate_state_delta", + ): + delta = yield self._calculate_state_delta( + room_id, current_state, + ) + state_delta_for_room[room_id] = delta + + # If we have the current_state then lets prefill + # the cache with it. if current_state is not None: current_state_for_room[room_id] = current_state - delta = yield self._calculate_state_delta( - room_id, current_state, - ) - if delta is not None: - state_delta_for_room[room_id] = delta yield self.runInteraction( "persist_events", @@ -498,7 +524,6 @@ class EventsStore(EventsWorkerStore): iterable=list(new_latest_event_ids), retcols=["prev_event_id"], keyvalues={ - "room_id": room_id, "is_state": False, }, desc="_calculate_new_extremeties", @@ -530,9 +555,15 @@ class EventsStore(EventsWorkerStore): the new forward extremities for the room. Returns: - Deferred[dict[(str,str), str]|None]: - None if there are no changes to the room state, or - a dict of (type, state_key) -> event_id]. + Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]: + Returns a tuple of two state maps, the first being the full new current + state and the second being the delta to the existing current state. + If both are None then there has been no change. + + If there has been a change then we only return the delta if its + already been calculated. Conversely if we do know the delta then + the new current state is only returned if we've already calculated + it. """ if not new_latest_event_ids: @@ -540,18 +571,32 @@ class EventsStore(EventsWorkerStore): # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} + + # Map from (prev state group, new state group) -> delta state dict + state_group_deltas = {} + for ev, ctx in events_context: if ctx.state_group is None: - # I don't think this can happen, but let's double-check - raise Exception( - "Context for new extremity event %s has no state " - "group" % (ev.event_id, ), - ) + # This should only happen for outlier events. + if not ev.internal_metadata.is_outlier(): + raise Exception( + "Context for new event %s has no state " + "group" % (ev.event_id, ), + ) + continue if ctx.state_group in state_groups_map: continue - state_groups_map[ctx.state_group] = ctx.current_state_ids + # We're only interested in pulling out state that has already + # been cached in the context. We'll pull stuff out of the DB later + # if necessary. + current_state_ids = ctx.get_cached_current_state_ids() + if current_state_ids is not None: + state_groups_map[ctx.state_group] = current_state_ids + + if ctx.prev_group: + state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can @@ -566,7 +611,7 @@ class EventsStore(EventsWorkerStore): for event_id in new_latest_event_ids: # First search in the list of new events we're adding. for ev, ctx in events_context: - if event_id == ev.event_id: + if event_id == ev.event_id and ctx.state_group is not None: event_id_to_state_group[event_id] = ctx.state_group break else: @@ -594,7 +639,26 @@ class EventsStore(EventsWorkerStore): # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return + defer.returnValue((None, None)) + + if len(new_state_groups) == 1 and len(old_state_groups) == 1: + # If we're going from one state group to another, lets check if + # we have a delta for that transition. If we do then we can just + # return that. + + new_state_group = next(iter(new_state_groups)) + old_state_group = next(iter(old_state_groups)) + + delta_ids = state_group_deltas.get( + (old_state_group, new_state_group,), None + ) + if delta_ids is not None: + # We have a delta from the existing to new current state, + # so lets just return that. If we happen to already have + # the current state in memory then lets also return that, + # but it doesn't matter if we don't. + new_state = state_groups_map.get(new_state_group) + defer.returnValue((new_state, delta_ids)) # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -606,7 +670,7 @@ class EventsStore(EventsWorkerStore): if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - defer.returnValue(state_groups_map[new_state_groups.pop()]) + defer.returnValue((state_groups_map[new_state_groups.pop()], None)) # Ok, we need to defer to the state handler to resolve our state sets. @@ -625,7 +689,7 @@ class EventsStore(EventsWorkerStore): room_id, state_groups, events_map, get_events ) - defer.returnValue(res.state) + defer.returnValue((res.state, None)) @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): @@ -634,28 +698,20 @@ class EventsStore(EventsWorkerStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to - first be deleted from current_state_events, `to_insert` are entries - to insert. + tuple[list, dict] (to_delete, to_insert): where to_delete are the + type/state_keys to remove from current_state_events and `to_insert` + are the updates to current_state_events. """ existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(itervalues(existing_state)) - new_events = set(ev_id for ev_id in itervalues(current_state)) - changed_events = existing_events ^ new_events - - if not changed_events: - return + to_delete = [ + key for key in existing_state + if key not in current_state + ] - to_delete = { - key: ev_id for key, ev_id in iteritems(existing_state) - if ev_id in changed_events - } - events_to_insert = (new_events - existing_events) to_insert = { key: ev_id for key, ev_id in iteritems(current_state) - if ev_id in events_to_insert + if ev_id != existing_state.get(key) } defer.returnValue((to_delete, to_insert)) @@ -678,10 +734,10 @@ class EventsStore(EventsWorkerStore): delete_existing (bool): True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. - state_delta_for_room (dict[str, (list[str], list[str])]): + state_delta_for_room (dict[str, (list, dict)]): The current-state delta for each room. For each room, a tuple - (to_delete, to_insert), being a list of event ids to be removed - from the current state, and a list of event ids to be added to + (to_delete, to_insert), being a list of type/state keys to be + removed from the current state, and a state set to be added to the current state. new_forward_extremeties (dict[str, list[str]]): The new forward extremities for each room. For each room, a @@ -759,9 +815,46 @@ class EventsStore(EventsWorkerStore): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple + + # First we add entries to the current_state_delta_stream. We + # do this before updating the current_state_events table so + # that we can use it to calculate the `prev_event_id`. (This + # allows us to not have to pull out the existing state + # unnecessarily). + sql = """ + INSERT INTO current_state_delta_stream + (stream_id, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, ?, ?, ?, ( + SELECT event_id FROM current_state_events + WHERE room_id = ? AND type = ? AND state_key = ? + ) + """ + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, None, + room_id, etype, state_key, + ) + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + )) + txn.executemany(sql, ( + ( + max_stream_order, room_id, etype, state_key, ev_id, + room_id, etype, state_key, + ) + for (etype, state_key), ev_id in iteritems(to_insert) + )) + + # Now we actually update the current_state_events table + txn.executemany( - "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in itervalues(to_delete)], + "DELETE FROM current_state_events" + " WHERE room_id = ? AND type = ? AND state_key = ?", + ( + (room_id, etype, state_key) + for etype, state_key in itertools.chain(to_delete, to_insert) + ), ) self._simple_insert_many_txn( @@ -778,25 +871,6 @@ class EventsStore(EventsWorkerStore): ], ) - state_deltas = {key: None for key in to_delete} - state_deltas.update(to_insert) - - self._simple_insert_many_txn( - txn, - table="current_state_delta_stream", - values=[ - { - "stream_id": max_stream_order, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": ev_id, - "prev_event_id": to_delete.get(key, None), - } - for key, ev_id in iteritems(state_deltas) - ] - ) - txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, room_id, max_stream_order, @@ -810,7 +884,8 @@ class EventsStore(EventsWorkerStore): # and which we have added, then we invlidate the caches for all # those users. members_changed = set( - state_key for ev_type, state_key in state_deltas + state_key + for ev_type, state_key in itertools.chain(to_delete, to_insert) if ev_type == EventTypes.Member ) @@ -1066,7 +1141,7 @@ class EventsStore(EventsWorkerStore): ): txn.executemany( "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts] ) def _store_event_txn(self, txn, events_and_contexts): @@ -1117,7 +1192,6 @@ class EventsStore(EventsWorkerStore): "type": event.type, "processed": True, "outlier": event.internal_metadata.is_outlier(), - "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), "received_ts": self._clock.time_msec(), "sender": event.sender, diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 67433606c6..f28239a808 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401 from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import ( LoggingContext, PreserveLoggingContext, @@ -322,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore): should_start = False if should_start: - with PreserveLoggingContext(): - self.runWithConnection( - self._do_fetch - ) + run_as_background_process( + "fetch_events", + self.runWithConnection, + self._do_fetch, + ) logger.debug("Loading %d events", len(events)) with PreserveLoggingContext(): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index be655d287b..6a5028961d 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -21,7 +21,6 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.constants import EventTypes from synapse.push.baserules import list_with_base_rules from synapse.storage.appservice import ApplicationServiceWorkerStore from synapse.storage.pusher import PusherWorkerStore @@ -186,6 +185,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, defer.returnValue(results) + @defer.inlineCallbacks def bulk_get_push_rules_for_room(self, event, context): state_group = context.state_group if not state_group: @@ -195,9 +195,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._bulk_get_push_rules_for_room( - event.room_id, state_group, context.current_state_ids, event=event + current_state_ids = yield context.get_current_state_ids(self) + result = yield self._bulk_get_push_rules_for_room( + event.room_id, state_group, current_state_ids, event=event ) + defer.returnValue(result) @cachedInlineCallbacks(num_args=2, cache_context=True) def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids, @@ -247,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, if uid in local_users_in_room: user_ids.add(uid) - forgotten = yield self.who_forgot_in_room( - event.room_id, on_invalidate=cache_context.invalidate, - ) - - for row in forgotten: - user_id = row["user_id"] - event_id = row["event_id"] - - mem_id = current_state_ids.get((EventTypes.Member, user_id), None) - if event_id == mem_id: - user_ids.discard(user_id) - rules_by_user = yield self.bulk_get_push_rules( user_ids, on_invalidate=cache_context.invalidate, ) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index cc273a57b2..8443bd4c1b 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore): ) if newly_inserted: - self.runInteraction( + yield self.runInteraction( "add_pusher", self._invalidate_cache_and_stream, self.get_if_user_has_pusher, (user_id,) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 02a802bed9..10dce21cea 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -24,7 +24,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.storage.events import EventsWorkerStore +from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id from synapse.util.async import Linearizer from synapse.util.caches import intern_string @@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): defer.returnValue(user_who_share_room) + @defer.inlineCallbacks def get_joined_users_from_context(self, event, context): state_group = context.state_group if not state_group: @@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_users_from_context( - event.room_id, state_group, context.current_state_ids, + current_state_ids = yield context.get_current_state_ids(self) + result = yield self._get_joined_users_from_context( + event.room_id, state_group, current_state_ids, event=event, context=context, ) + defer.returnValue(result) def get_joined_users_from_state(self, room_id, state_entry): state_group = state_entry.state_group @@ -458,17 +461,29 @@ class RoomMemberWorkerStore(EventsWorkerStore): def _get_joined_hosts_cache(self, room_id): return _JoinedHostsCache(self, room_id) - @cached() - def who_forgot_in_room(self, room_id): - return self._simple_select_list( - table="room_memberships", - retcols=("user_id", "event_id"), - keyvalues={ - "room_id": room_id, - "forgotten": 1, - }, - desc="who_forgot" - ) + @cachedInlineCallbacks(num_args=2) + def did_forget(self, user_id, room_id): + """Returns whether user_id has elected to discard history for room_id. + + Returns False if they have since re-joined.""" + def f(txn): + sql = ( + "SELECT" + " COUNT(*)" + " FROM" + " room_memberships" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + " AND" + " forgotten = 0" + ) + txn.execute(sql, (user_id, room_id)) + rows = txn.fetchall() + return rows[0][0] + count = yield self.runInteraction("did_forget_membership", f) + defer.returnValue(count == 0) class RoomMemberStore(RoomMemberWorkerStore): @@ -577,36 +592,11 @@ class RoomMemberStore(RoomMemberWorkerStore): ) txn.execute(sql, (user_id, room_id)) - txn.call_after(self.did_forget.invalidate, (user_id, room_id)) self._invalidate_cache_and_stream( - txn, self.who_forgot_in_room, (room_id,) + txn, self.did_forget, (user_id, room_id,), ) return self.runInteraction("forget_membership", f) - @cachedInlineCallbacks(num_args=2) - def did_forget(self, user_id, room_id): - """Returns whether user_id has elected to discard history for room_id. - - Returns False if they have since re-joined.""" - def f(txn): - sql = ( - "SELECT" - " COUNT(*)" - " FROM" - " room_memberships" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - " AND" - " forgotten = 0" - ) - txn.execute(sql, (user_id, room_id)) - rows = txn.fetchall() - return rows[0][0] - count = yield self.runInteraction("did_forget_membership", f) - defer.returnValue(count == 0) - @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py new file mode 100644 index 0000000000..7d27342e39 --- /dev/null +++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +We want to stop populating 'event.content', so we need to make it nullable. + +If this has to be rolled back, then the following should populate the missing data: + +Postgres: + + UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej + WHERE ej.event_id = events.event_id AND + stream_ordering < ( + SELECT stream_ordering FROM events WHERE content IS NOT NULL + ORDER BY stream_ordering LIMIT 1 + ); + + UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej + WHERE ej.event_id = events.event_id AND + stream_ordering > ( + SELECT stream_ordering FROM events WHERE content IS NOT NULL + ORDER BY stream_ordering DESC LIMIT 1 + ); + +SQLite: + + UPDATE events SET content=( + SELECT json_extract(json,'$.content') FROM event_json ej + WHERE ej.event_id = events.event_id + ) + WHERE + stream_ordering < ( + SELECT stream_ordering FROM events WHERE content IS NOT NULL + ORDER BY stream_ordering LIMIT 1 + ) + OR stream_ordering > ( + SELECT stream_ordering FROM events WHERE content IS NOT NULL + ORDER BY stream_ordering DESC LIMIT 1 + ); + +""" + +import logging + +from synapse.storage.engines import PostgresEngine + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + pass + + +def run_upgrade(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute(""" + ALTER TABLE events ALTER COLUMN content DROP NOT NULL; + """) + return + + # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html + + cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'") + (oldsql,) = cur.fetchone() + + sql = oldsql.replace("content TEXT NOT NULL", "content TEXT") + if sql == oldsql: + raise Exception("Couldn't find null constraint to drop in %s" % oldsql) + + logger.info("Replacing definition of 'events' with: %s", sql) + + cur.execute("PRAGMA schema_version") + (oldver,) = cur.fetchone() + cur.execute("PRAGMA writable_schema=ON") + cur.execute( + "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'", + (sql, ), + ) + cur.execute("PRAGMA schema_version=%i" % (oldver+1,)) + cur.execute("PRAGMA writable_schema=OFF") diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql index 52eec88357..6b5a5a88fa 100644 --- a/synapse/storage/schema/full_schemas/16/event_edges.sql +++ b/synapse/storage/schema/full_schemas/16/event_edges.sql @@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS event_edges( event_id TEXT NOT NULL, prev_event_id TEXT NOT NULL, room_id TEXT NOT NULL, - is_state BOOL NOT NULL, + is_state BOOL NOT NULL, -- true if this is a prev_state edge rather than a regular + -- event dag edge. UNIQUE (event_id, prev_event_id, room_id, is_state) ); diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql index ba5346806e..5f5cb8d01d 100644 --- a/synapse/storage/schema/full_schemas/16/im.sql +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -19,7 +19,12 @@ CREATE TABLE IF NOT EXISTS events( event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, - content TEXT NOT NULL, + + -- 'content' used to be created NULLable, but as of delta 50 we drop that constraint. + -- the hack we use to drop the constraint doesn't work for an in-memory sqlite + -- database, which breaks the sytests. Hence, we no longer make it nullable. + content TEXT, + unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 89a05c4618..b27b3ae144 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -186,7 +186,17 @@ class StateGroupWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): - """Returns dictionary state_group -> (dict of (type, state_key) -> event id) + """Returns the state groups for a given set of groups, filtering on + types of state events. + + Args: + groups(list[int]): list of state group IDs to query + types (Iterable[str, str|None]|None): list of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. If None, all types are returned. + + Returns: + dictionary state_group -> (dict of (type, state_key) -> event id) """ results = {} @@ -200,8 +210,11 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue(results) - def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + def _get_state_groups_from_groups_txn( + self, txn, groups, types=None, + ): results = {group: {} for group in groups} + if types is not None: types = list(set(types)) # deduplicate types list @@ -239,7 +252,7 @@ class StateGroupWorkerStore(SQLBaseStore): # Turns out that postgres doesn't like doing a list of OR's and # is about 1000x slower, so we just issue a query for each specific # type seperately. - if types: + if types is not None: clause_to_args = [ ( "AND type = ? AND state_key = ?", @@ -278,6 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore): else: where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) + where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: where_clause = "" @@ -332,16 +346,20 @@ class StateGroupWorkerStore(SQLBaseStore): return results @defer.inlineCallbacks - def get_state_for_events(self, event_ids, types): + def get_state_for_events(self, event_ids, types, filtered_types=None): """Given a list of event_ids and type tuples, return a list of state dicts for each event. The state dicts will only have the type/state_keys that are in the `types` list. Args: - event_ids (list) - types (list): List of (type, state_key) tuples which are used to - filter the state fetched. `state_key` may be None, which matches - any `state_key` + event_ids (list[string]) + types (list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: deferred: A list of dicts corresponding to the event_ids given. @@ -352,7 +370,7 @@ class StateGroupWorkerStore(SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) state_event_map = yield self.get_events( [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], @@ -371,15 +389,19 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_ids_for_events(self, event_ids, types=None): + def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): """ Get the state dicts corresponding to a list of events Args: event_ids(list(str)): events whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from event_id -> (type, state_key) -> state_event @@ -389,7 +411,7 @@ class StateGroupWorkerStore(SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) event_to_state = { event_id: group_to_state[group] @@ -399,37 +421,45 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_for_event(self, event_id, types=None): + def get_state_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_for_events([event_id], types) + state_map = yield self.get_state_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @defer.inlineCallbacks - def get_state_ids_for_event(self, event_id, types=None): + def get_state_ids_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_ids_for_events([event_id], types) + state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @cached(max_entries=50000) @@ -460,56 +490,73 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) - def _get_some_state_from_cache(self, group, types): + def _get_some_state_from_cache(self, group, types, filtered_types=None): """Checks if group is in cache. See `_get_state_for_groups` - Returns 3-tuple (`state_dict`, `missing_types`, `got_all`). - `missing_types` is the list of types that aren't in the cache for that - group. `got_all` is a bool indicating if we successfully retrieved all + Args: + group(int): The state group to lookup + types(list[str, str|None]): List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + + Returns 2-tuple (`state_dict`, `got_all`). + `got_all` is a bool indicating if we successfully retrieved all requests state from the cache, if False we need to query the DB for the missing state. - - Args: - group: The state group to lookup - types (list): List of 2-tuples of the form (`type`, `state_key`), - where a `state_key` of `None` matches all state_keys for the - `type`. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} - missing_types = set() + + # tracks whether any of ourrequested types are missing from the cache + missing_types = False for typ, state_key in types: key = (typ, state_key) - if state_key is None: + + if ( + state_key is None or + (filtered_types is not None and typ not in filtered_types) + ): type_to_key[typ] = None - missing_types.add(key) + # we mark the type as missing from the cache because + # when the cache was populated it might have been done with a + # restricted set of state_keys, so the wildcard will not work + # and the cache may be incomplete. + missing_types = True else: if type_to_key.get(typ, object()) is not None: type_to_key.setdefault(typ, set()).add(state_key) if key not in state_dict_ids and key not in known_absent: - missing_types.add(key) + missing_types = True sentinel = object() def include(typ, state_key): valid_state_keys = type_to_key.get(typ, sentinel) if valid_state_keys is sentinel: - return False + return filtered_types is not None and typ not in filtered_types if valid_state_keys is None: return True if state_key in valid_state_keys: return True return False - got_all = is_all or not missing_types + got_all = is_all + if not got_all: + # the cache is incomplete. We may still have got all the results we need, if + # we don't have any wildcards in the match list. + if not missing_types and filtered_types is None: + got_all = True return { k: v for k, v in iteritems(state_dict_ids) if include(k[0], k[1]) - }, missing_types, got_all + }, got_all def _get_all_state_from_cache(self, group): """Checks if group is in cache. See `_get_state_for_groups` @@ -526,7 +573,7 @@ class StateGroupWorkerStore(SQLBaseStore): return state_dict_ids, is_all @defer.inlineCallbacks - def _get_state_for_groups(self, groups, types=None): + def _get_state_for_groups(self, groups, types=None, filtered_types=None): """Gets the state at each of a list of state groups, optionally filtering by type/state_key @@ -540,6 +587,9 @@ class StateGroupWorkerStore(SQLBaseStore): Otherwise, each entry should be a `(type, state_key)` tuple to include in the response. A `state_key` of None is a wildcard meaning that we require all state with that type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: Deferred[dict[int, dict[(type, state_key), EventBase]]] @@ -551,8 +601,8 @@ class StateGroupWorkerStore(SQLBaseStore): missing_groups = [] if types is not None: for group in set(groups): - state_dict_ids, _, got_all = self._get_some_state_from_cache( - group, types, + state_dict_ids, got_all = self._get_some_state_from_cache( + group, types, filtered_types ) results[group] = state_dict_ids @@ -579,13 +629,13 @@ class StateGroupWorkerStore(SQLBaseStore): # cache. Hence, if we are doing a wildcard lookup, populate the # cache fully so that we can do an efficient lookup next time. - if types and any(k is None for (t, k) in types): + if filtered_types or (types and any(k is None for (t, k) in types)): types_to_fetch = None else: types_to_fetch = types group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch, + missing_groups, types_to_fetch ) for group, group_state_dict in iteritems(group_to_state_dict): @@ -595,7 +645,10 @@ class StateGroupWorkerStore(SQLBaseStore): if types: for k, v in iteritems(group_state_dict): (typ, _) = k - if k in types or (typ, None) in types: + if ( + (k in types or (typ, None) in types) or + (filtered_types and typ not in filtered_types) + ): state_dict[k] = v else: state_dict.update(group_state_dict) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 66856342f0..b9f2b74ac6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore from synapse.storage.engines import PostgresEngine -from synapse.storage.events import EventsWorkerStore +from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -527,7 +527,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) @defer.inlineCallbacks - def get_events_around(self, room_id, event_id, before_limit, after_limit): + def get_events_around( + self, room_id, event_id, before_limit, after_limit, event_filter=None, + ): """Retrieve events and pagination tokens around a given event in a room. @@ -536,6 +538,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event_id (str) before_limit (int) after_limit (int) + event_filter (Filter|None) Returns: dict @@ -543,7 +546,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results = yield self.runInteraction( "get_events_around", self._get_events_around_txn, - room_id, event_id, before_limit, after_limit + room_id, event_id, before_limit, after_limit, event_filter, ) events_before = yield self._get_events( @@ -563,7 +566,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "end": results["after"]["token"], }) - def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit): + def _get_events_around_txn( + self, txn, room_id, event_id, before_limit, after_limit, event_filter, + ): """Retrieves event_ids and pagination tokens around a given event in a room. @@ -572,6 +577,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event_id (str) before_limit (int) after_limit (int) + event_filter (Filter|None) Returns: dict @@ -601,11 +607,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows, start_token = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, + event_filter=event_filter, ) events_before = [r.event_id for r in rows] rows, end_token = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, + event_filter=event_filter, ) events_after = [r.event_id for r in rows] diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index c3bc94f56d..428e7fa36e 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore @@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore): def __init__(self, db_conn, hs): super(TransactionStore, self).__init__(db_conn, hs) - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -271,6 +272,11 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + def _start_cleanup_transactions(self): + return run_as_background_process( + "cleanup_transactions", self._cleanup_transactions, + ) + def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 diff --git a/synapse/util/async.py b/synapse/util/async.py index 5d0fb39130..a7094e2fb4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import logging from contextlib import contextmanager @@ -156,54 +157,72 @@ def concurrently_execute(func, args, limit): class Linearizer(object): - """Linearizes access to resources based on a key. Useful to ensure only one - thing is happening at a time on a given resource. + """Limits concurrent access to resources based on a key. Useful to ensure + only a few things happen at a time on a given resource. Example: - with (yield linearizer.queue("test_key")): + with (yield limiter.queue("test_key")): # do some work. """ - def __init__(self, name=None, clock=None): + def __init__(self, name=None, max_count=1, clock=None): + """ + Args: + max_count(int): The maximum number of concurrent accesses + """ if name is None: self.name = id(self) else: self.name = name - self.key_to_defer = {} if not clock: from twisted.internet import reactor clock = Clock(reactor) self._clock = clock + self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing, and + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. + self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - # If there is already a deferred in the queue, we pull it out so that - # we can wait on it later. - # Then we replace it with a deferred that we resolve *after* the - # context manager has exited. - # We only return the context manager after the previous deferred has - # resolved. - # This all has the net effect of creating a chain of deferreds that - # wait for the previous deferred before starting their work. - current_defer = self.key_to_defer.get(key) + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) - new_defer = defer.Deferred() - self.key_to_defer[key] = new_defer + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. + if entry[0] >= self.max_count: + new_defer = defer.Deferred() + entry[1][new_defer] = 1 - if current_defer: logger.info( - "Waiting to acquire linearizer lock %r for key %r", self.name, key + "Waiting to acquire linearizer lock %r for key %r", self.name, key, ) try: - with PreserveLoggingContext(): - yield current_defer - except Exception: - logger.exception("Unexpected exception in Linearizer") - - logger.info("Acquired linearizer lock %r for key %r", self.name, - key) + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise + + logger.info("Acquired linearizer lock %r for key %r", self.name, key) + entry[0] += 1 # if the code holding the lock completes synchronously, then it # will recursively run the next claimant on the list. That can @@ -213,15 +232,15 @@ class Linearizer(object): # In order to break the cycle, we add a cheeky sleep(0) here to # ensure that we fall back to the reactor between each iteration. # - # (There's no particular need for it to happen before we return - # the context manager, but it needs to happen while we hold the - # lock, and the context manager's exit code must be synchronous, - # so actually this is the only sensible place. + # (This needs to happen while we hold the lock, and the context manager's exit + # code must be synchronous, so this is the only sensible place.) yield self._clock.sleep(0) else: - logger.info("Acquired uncontended linearizer lock %r for key %r", - self.name, key) + logger.info( + "Acquired uncontended linearizer lock %r for key %r", self.name, key, + ) + entry[0] += 1 @contextmanager def _ctx_manager(): @@ -229,73 +248,15 @@ class Linearizer(object): yield finally: logger.info("Releasing linearizer lock %r for key %r", self.name, key) - with PreserveLoggingContext(): - new_defer.callback(None) - current_d = self.key_to_defer.get(key) - if current_d is new_defer: - self.key_to_defer.pop(key, None) - - defer.returnValue(_ctx_manager()) - - -class Limiter(object): - """Limits concurrent access to resources based on a key. Useful to ensure - only a few thing happen at a time on a given resource. - - Example: - - with (yield limiter.queue("test_key")): - # do some work. - - """ - def __init__(self, max_count): - """ - Args: - max_count(int): The maximum number of concurrent access - """ - self.max_count = max_count - - # key_to_defer is a map from the key to a 2 element list where - # the first element is the number of things executing - # the second element is a list of deferreds for the things blocked from - # executing. - self.key_to_defer = {} - - @defer.inlineCallbacks - def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, []]) - - # If the number of things executing is greater than the maximum - # then add a deferred to the list of blocked items - # When on of the things currently executing finishes it will callback - # this item so that it can continue executing. - if entry[0] >= self.max_count: - new_defer = defer.Deferred() - entry[1].append(new_defer) - - logger.info("Waiting to acquire limiter lock for key %r", key) - with PreserveLoggingContext(): - yield new_defer - logger.info("Acquired limiter lock for key %r", key) - else: - logger.info("Acquired uncontended limiter lock for key %r", key) - - entry[0] += 1 - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - logger.info("Releasing limiter lock for key %r", key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them entry[0] -= 1 if entry[1]: - next_def = entry[1].pop(0) + (next_def, _) = entry[1].popitem(last=False) + # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): next_def.callback(None) elif entry[0] == 0: diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index f8a07df6b8..861c24809c 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -473,105 +473,101 @@ class CacheListDescriptor(_CacheDescriptorBase): @functools.wraps(self.orig) def wrapped(*args, **kwargs): - # If we're passed a cache_context then we'll want to call its invalidate() - # whenever we are invalidated + # If we're passed a cache_context then we'll want to call its + # invalidate() whenever we are invalidated invalidate_callback = kwargs.pop("on_invalidate", None) arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] list_args = arg_dict[self.list_name] - # cached is a dict arg -> deferred, where deferred results in a - # 2-tuple (`arg`, `result`) results = {} - cached_defers = {} - missing = [] + + def update_results_dict(res, arg): + results[arg] = res + + # list of deferreds to wait for + cached_defers = [] + + missing = set() # If the cache takes a single arg then that is used as the key, # otherwise a tuple is used. if num_args == 1: - def cache_get(arg): - return cache.get(arg, callback=invalidate_callback) + def arg_to_cache_key(arg): + return arg else: - key = list(keyargs) + keylist = list(keyargs) - def cache_get(arg): - key[self.list_pos] = arg - return cache.get(tuple(key), callback=invalidate_callback) + def arg_to_cache_key(arg): + keylist[self.list_pos] = arg + return tuple(keylist) for arg in list_args: try: - res = cache_get(arg) - + res = cache.get(arg_to_cache_key(arg), + callback=invalidate_callback) if not isinstance(res, ObservableDeferred): results[arg] = res elif not res.has_succeeded(): res = res.observe() - res.addCallback(lambda r, arg: (arg, r), arg) - cached_defers[arg] = res + res.addCallback(update_results_dict, arg) + cached_defers.append(res) else: results[arg] = res.get_result() except KeyError: - missing.append(arg) + missing.add(arg) if missing: + # we need an observable deferred for each entry in the list, + # which we put in the cache. Each deferred resolves with the + # relevant result for that key. + deferreds_map = {} + for arg in missing: + deferred = defer.Deferred() + deferreds_map[arg] = deferred + key = arg_to_cache_key(arg) + observable = ObservableDeferred(deferred) + cache.set(key, observable, callback=invalidate_callback) + + def complete_all(res): + # the wrapped function has completed. It returns a + # a dict. We can now resolve the observable deferreds in + # the cache and update our own result map. + for e in missing: + val = res.get(e, None) + deferreds_map[e].callback(val) + results[e] = val + + def errback(f): + # the wrapped function has failed. Invalidate any cache + # entries we're supposed to be populating, and fail + # their deferreds. + for e in missing: + key = arg_to_cache_key(e) + cache.invalidate(key) + deferreds_map[e].errback(f) + + # return the failure, to propagate to our caller. + return f + args_to_call = dict(arg_dict) - args_to_call[self.list_name] = missing + args_to_call[self.list_name] = list(missing) - ret_d = defer.maybeDeferred( + cached_defers.append(defer.maybeDeferred( logcontext.preserve_fn(self.function_to_call), **args_to_call - ) - - ret_d = ObservableDeferred(ret_d) - - # We need to create deferreds for each arg in the list so that - # we can insert the new deferred into the cache. - for arg in missing: - observer = ret_d.observe() - observer.addCallback(lambda r, arg: r.get(arg, None), arg) - - observer = ObservableDeferred(observer) - - if num_args == 1: - cache.set( - arg, observer, - callback=invalidate_callback - ) - - def invalidate(f, key): - cache.invalidate(key) - return f - observer.addErrback(invalidate, arg) - else: - key = list(keyargs) - key[self.list_pos] = arg - cache.set( - tuple(key), observer, - callback=invalidate_callback - ) - - def invalidate(f, key): - cache.invalidate(key) - return f - observer.addErrback(invalidate, tuple(key)) - - res = observer.observe() - res.addCallback(lambda r, arg: (arg, r), arg) - - cached_defers[arg] = res + ).addCallbacks(complete_all, errback)) if cached_defers: - def update_results_dict(res): - results.update(res) - return results - - return logcontext.make_deferred_yieldable(defer.gatherResults( - list(cached_defers.values()), + d = defer.gatherResults( + cached_defers, consumeErrors=True, - ).addCallback(update_results_dict).addErrback( + ).addCallbacks( + lambda _: results, unwrapFirstError - )) + ) + return logcontext.make_deferred_yieldable(d) else: return results @@ -625,7 +621,8 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal cache. Args: - cache (Cache): The underlying cache to use. + cached_method_name (str): The name of the single-item lookup method. + This is only used to find the cache to use. list_name (str): The name of the argument that is the list to use to do batch lookups in the cache. num_args (int): Number of arguments to use as the key in the cache diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 4abca91f6d..ce85b2ae11 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,6 +16,7 @@ import logging from collections import OrderedDict +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache logger = logging.getLogger(__name__) @@ -63,7 +64,10 @@ class ExpiringCache(object): return def f(): - self._prune_cache() + return run_as_background_process( + "prune_cache_%s" % self._cache_name, + self._prune_cache, + ) self._clock.looping_call(f, self._expiry_ms / 2) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 258655349b..f2bde74dc5 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -74,12 +74,14 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - result = { + changed_entities = { self._cache[k] for k in self._cache.islice( start=self._cache.bisect_right(stream_pos), ) } + result = changed_entities.intersection(entities) + self.metrics.inc_hits() else: result = set(entities) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 734331caaa..194da87639 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -17,20 +17,18 @@ import logging from twisted.internet import defer -from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) def user_left_room(distributor, user, room_id): - with PreserveLoggingContext(): - distributor.fire("user_left_room", user=user, room_id=room_id) + distributor.fire("user_left_room", user=user, room_id=room_id) def user_joined_room(distributor, user, room_id): - with PreserveLoggingContext(): - distributor.fire("user_joined_room", user=user, room_id=room_id) + distributor.fire("user_joined_room", user=user, room_id=room_id) class Distributor(object): @@ -44,9 +42,7 @@ class Distributor(object): model will do for today. """ - def __init__(self, suppress_failures=True): - self.suppress_failures = suppress_failures - + def __init__(self): self.signals = {} self.pre_registration = {} @@ -56,7 +52,6 @@ class Distributor(object): self.signals[name] = Signal( name, - suppress_failures=self.suppress_failures, ) if name in self.pre_registration: @@ -75,10 +70,18 @@ class Distributor(object): self.pre_registration[name].append(observer) def fire(self, name, *args, **kwargs): + """Dispatches the given signal to the registered observers. + + Runs the observers as a background process. Does not return a deferred. + """ if name not in self.signals: raise KeyError("%r does not have a signal named %s" % (self, name)) - return self.signals[name].fire(*args, **kwargs) + run_as_background_process( + name, + self.signals[name].fire, + *args, **kwargs + ) class Signal(object): @@ -91,9 +94,8 @@ class Signal(object): method into all of the observers. """ - def __init__(self, name, suppress_failures): + def __init__(self, name): self.name = name - self.suppress_failures = suppress_failures self.observers = [] def observe(self, observer): @@ -103,7 +105,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -121,22 +122,17 @@ class Signal(object): failure.type, failure.value, failure.getTracebackObject())) - if not self.suppress_failures: - return failure return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) - with PreserveLoggingContext(): - deferreds = [ - do(observer) - for observer in self.observers - ] - - res = yield defer.gatherResults( - deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + deferreds = [ + run_in_background(do, o) + for o in self.observers + ] - defer.returnValue(res) + return make_deferred_yieldable(defer.gatherResults( + deferreds, consumeErrors=True, + )) def __repr__(self): return "<Signal name=%r>" % (self.name,) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index f6c7175f74..8dcae50b39 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -99,6 +99,17 @@ class ContextResourceUsage(object): self.db_sched_duration_sec = 0 self.evt_db_fetch_count = 0 + def __repr__(self): + return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', " + "db_txn_count='%r', db_txn_duration_sec='%r', " + "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % ( + self.ru_stime, + self.ru_utime, + self.db_txn_count, + self.db_txn_duration_sec, + self.db_sched_duration_sec, + self.evt_db_fetch_count,) + def __iadd__(self, other): """Add another ContextResourceUsage's stats to this one's. diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 6ba7107896..97f1267380 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -104,12 +104,19 @@ class Measure(object): logger.warn("Expected context. (%r)", self.name) return - usage = context.get_resource_usage() - self.start_usage - block_ru_utime.labels(self.name).inc(usage.ru_utime) - block_ru_stime.labels(self.name).inc(usage.ru_stime) - block_db_txn_count.labels(self.name).inc(usage.db_txn_count) - block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) - block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + current = context.get_resource_usage() + usage = current - self.start_usage + try: + block_ru_utime.labels(self.name).inc(usage.ru_utime) + block_ru_stime.labels(self.name).inc(usage.ru_stime) + block_db_txn_count.labels(self.name).inc(usage.db_txn_count) + block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) + block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + except ValueError: + logger.warn( + "Failed to save metrics! OLD: %r, NEW: %r", + self.start_usage, current + ) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/synapse/visibility.py b/synapse/visibility.py index 015c2bab37..d4680863d3 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,15 +12,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools + import logging import operator +from six import iteritems, itervalues +from six.moves import map + from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events.utils import prune_event -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) @@ -72,19 +75,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, types=types, ) - forgotten = yield make_deferred_yieldable(defer.gatherResults([ - defer.maybeDeferred( - preserve_fn(store.who_forgot_in_room), - room_id, - ) - for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True)) - - # Set of membership event_ids that have been forgotten - event_id_forgotten = frozenset( - row["event_id"] for rows in forgotten for row in rows - ) - ignore_dict_content = yield store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id, ) @@ -173,10 +163,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, if membership is None: membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: - # XXX why do we do this? - # https://github.com/matrix-org/synapse/issues/3350 - if membership_event.event_id not in event_id_forgotten: - membership = membership_event.membership + membership = membership_event.membership # if the user was a member of the room at the time of the event, # they can see it. @@ -218,10 +205,161 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, return event # check each event: gives an iterable[None|EventBase] - filtered_events = itertools.imap(allowed, events) + filtered_events = map(allowed, events) # remove the None entries filtered_events = filter(operator.truth, filtered_events) # we turn it into a list before returning it. defer.returnValue(list(filtered_events)) + + +@defer.inlineCallbacks +def filter_events_for_server(store, server_name, events): + # Whatever else we do, we need to check for senders which have requested + # erasure of their data. + erased_senders = yield store.are_users_erased( + e.sender for e in events, + ) + + def redact_disallowed(event, state): + # if the sender has been gdpr17ed, always return a redacted + # copy of the event. + if erased_senders[event.sender]: + logger.info( + "Sender of %s has been erased, redacting", + event.event_id, + ) + return prune_event(event) + + # state will be None if we decided we didn't need to filter by + # room membership. + if not state: + return event + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + if visibility in ["invited", "joined"]: + # We now loop through all state events looking for + # membership states for the requesting server to determine + # if the server is either in the room or has been invited + # into the room. + for ev in itervalues(state): + if ev.type != EventTypes.Member: + continue + try: + domain = get_domain_from_id(ev.state_key) + except Exception: + continue + + if domain != server_name: + continue + + memtype = ev.membership + if memtype == Membership.JOIN: + return event + elif memtype == Membership.INVITE: + if visibility == "invited": + return event + else: + # server has no users in the room: redact + return prune_event(event) + + return event + + # Next lets check to see if all the events have a history visibility + # of "shared" or "world_readable". If thats the case then we don't + # need to check membership (as we know the server is in the room). + event_to_state_ids = yield store.get_state_ids_for_events( + frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + ) + ) + + visibility_ids = set() + for sids in itervalues(event_to_state_ids): + hist = sids.get((EventTypes.RoomHistoryVisibility, "")) + if hist: + visibility_ids.add(hist) + + # If we failed to find any history visibility events then the default + # is "shared" visiblity. + if not visibility_ids: + all_open = True + else: + event_map = yield store.get_events(visibility_ids) + all_open = all( + e.content.get("history_visibility") in (None, "shared", "world_readable") + for e in itervalues(event_map) + ) + + if all_open: + # all the history_visibility state affecting these events is open, so + # we don't need to filter by membership state. We *do* need to check + # for user erasure, though. + if erased_senders: + events = [ + redact_disallowed(e, None) + for e in events + ] + + defer.returnValue(events) + + # Ok, so we're dealing with events that have non-trivial visibility + # rules, so we need to also get the memberships of the room. + + # first, for each event we're wanting to return, get the event_ids + # of the history vis and membership state at those events. + event_to_state_ids = yield store.get_state_ids_for_events( + frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None), + ) + ) + + # We only want to pull out member events that correspond to the + # server's domain. + # + # event_to_state_ids contains lots of duplicates, so it turns out to be + # cheaper to build a complete set of unique + # ((type, state_key), event_id) tuples, and then filter out the ones we + # don't want. + # + state_key_to_event_id_set = { + e + for key_to_eid in itervalues(event_to_state_ids) + for e in key_to_eid.items() + } + + def include(typ, state_key): + if typ != EventTypes.Member: + return True + + # we avoid using get_domain_from_id here for efficiency. + idx = state_key.find(":") + if idx == -1: + return False + return state_key[idx + 1:] == server_name + + event_map = yield store.get_events([ + e_id + for key, e_id in state_key_to_event_id_set + if include(key[0], key[1]) + ]) + + event_to_state = { + e_id: { + key: event_map[inner_e_id] + for key, inner_e_id in iteritems(key_to_eid) + if inner_e_id in event_map + } + for e_id, key_to_eid in iteritems(event_to_state_ids) + } + + defer.returnValue([ + redact_disallowed(e, event_to_state[e.event_id]) + for e in events + ]) |