summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/errors.py3
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/appservice/api.py8
-rw-r--r--synapse/crypto/keyring.py8
-rw-r--r--synapse/federation/federation_base.py21
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/federation_server.py55
-rw-r--r--synapse/federation/send_queue.py28
-rw-r--r--synapse/federation/transport/server.py12
-rw-r--r--synapse/handlers/federation.py109
-rw-r--r--synapse/handlers/message.py86
-rw-r--r--synapse/handlers/room_list.py38
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/handlers/sync.py23
-rw-r--r--synapse/notifier.py1
-rw-r--r--synapse/python_dependencies.py24
-rw-r--r--synapse/replication/http/send_event.py18
-rw-r--r--synapse/storage/event_federation.py63
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/room.py6
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/util/caches/response_cache.py88
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/file_consumer.py4
-rw-r--r--synapse/util/logcontext.py1
27 files changed, 459 insertions, 216 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py

index 7f6090baf8..f31cb9a3cb 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py
@@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.27.3-rc2" +__version__ = "0.28.1" diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 489efb7f86..5baba43966 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py
@@ -16,6 +16,9 @@ """Contains constants from the specification.""" +# the "depth" field on events is limited to 2**63 - 1 +MAX_DEPTH = 2**63 - 1 + class Membership(object): diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index bee59e80dd..a9ff5576f3 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py
@@ -18,6 +18,7 @@ import logging import simplejson as json +from six import iteritems logger = logging.getLogger(__name__) @@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs): A dict representing the error response JSON. """ err = {"error": msg, "errcode": code} - for key, value in kwargs.iteritems(): + for key, value in iteritems(kwargs): err[key] = value return err diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 508b66613d..2fddcd935a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py
@@ -58,6 +58,8 @@ from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor from twisted.web.resource import NoResource +from six import iteritems + logger = logging.getLogger("synapse.app.synchrotron") @@ -211,7 +213,7 @@ class SynchrotronPresence(object): def get_currently_syncing_users(self): return [ - user_id for user_id, count in self.user_to_num_current_syncs.iteritems() + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) if count > 0 ] diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 11e9c37c63..00efff1464 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py
@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind from synapse.api.errors import CodeMessageException from synapse.http.client import SimpleHttpClient from synapse.events.utils import serialize_event -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.caches.response_cache import ResponseCache from synapse.types import ThirdPartyInstanceID @@ -194,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(None) key = (service.id, protocol) - result = self.protocol_meta_cache.get(key) - if not result: - result = self.protocol_meta_cache.set( - key, preserve_fn(_get)() - ) - return make_deferred_yieldable(result) + return self.protocol_meta_cache.wrap(key, _get) @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 35f810b07b..fce83d445f 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py
@@ -352,7 +352,7 @@ class Keyring(object): logger.exception( "Unable to get key from %r: %s %s", perspective_name, - type(e).__name__, str(e.message), + type(e).__name__, str(e), ) defer.returnValue({}) @@ -384,7 +384,7 @@ class Keyring(object): logger.info( "Unable to get key %r for %r directly: %s %s", key_ids, server_name, - type(e).__name__, str(e.message), + type(e).__name__, str(e), ) if not keys: @@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request): except IOError as e: logger.warn( "Got IOError when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), + server_name, type(e).__name__, str(e), ) raise SynapseError( 502, @@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request): except Exception as e: logger.exception( "Got Exception when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), + server_name, type(e).__name__, str(e), ) raise SynapseError( 401, diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 79eaa31031..4cc98a3fe8 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -14,7 +14,10 @@ # limitations under the License. import logging -from synapse.api.errors import SynapseError +import six + +from synapse.api.constants import MAX_DEPTH +from synapse.api.errors import SynapseError, Codes from synapse.crypto.event_signing import check_event_content_hash from synapse.events import FrozenEvent from synapse.events.utils import prune_event @@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False): FrozenEvent Raises: - SynapseError: if the pdu is missing required fields + SynapseError: if the pdu is missing required fields or is otherwise + not a valid matrix event """ # we could probably enforce a bunch of other fields here (room_id, sender, # origin, etc etc) - assert_params_in_request(pdu_json, ('event_id', 'type')) + assert_params_in_request(pdu_json, ('event_id', 'type', 'depth')) + + depth = pdu_json['depth'] + if not isinstance(depth, six.integer_types): + raise SynapseError(400, "Depth %r not an intger" % (depth, ), + Codes.BAD_JSON) + + if depth < 0: + raise SynapseError(400, "Depth too small", Codes.BAD_JSON) + elif depth > MAX_DEPTH: + raise SynapseError(400, "Depth too large", Codes.BAD_JSON) + event = FrozenEvent( pdu_json ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 38440da5b5..8e2c0c4cd2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -394,7 +394,7 @@ class FederationClient(FederationBase): seen_events = yield self.store.get_events(event_ids, allow_rejected=True) signed_events = seen_events.values() else: - seen_events = yield self.store.have_events(event_ids) + seen_events = yield self.store.have_seen_events(event_ids) signed_events = [] failed_to_fetch = set() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e4ce037acf..247ddc89d5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,9 +31,10 @@ import synapse.metrics from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logutils import log_function +from six import iteritems + # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. TRANSACTION_CONCURRENCY_LIMIT = 10 @@ -212,16 +214,17 @@ class FederationServer(FederationBase): if not in_room: raise AuthError(403, "Host not in room.") - result = self._state_resp_cache.get((room_id, event_id)) - if not result: - with (yield self._server_linearizer.queue((origin, room_id))): - d = self._state_resp_cache.set( - (room_id, event_id), - preserve_fn(self._on_context_state_request_compute)(room_id, event_id) - ) - resp = yield make_deferred_yieldable(d) - else: - resp = yield make_deferred_yieldable(result) + # we grab the linearizer to protect ourselves from servers which hammer + # us. In theory we might already have the response to this query + # in the cache so we could return it without waiting for the linearizer + # - but that's non-trivial to get right, and anyway somewhat defeats + # the point of the linearizer. + with (yield self._server_linearizer.queue((origin, room_id))): + resp = yield self._state_resp_cache.wrap( + (room_id, event_id), + self._on_context_state_request_compute, + room_id, event_id, + ) defer.returnValue((200, resp)) @@ -425,9 +428,9 @@ class FederationServer(FederationBase): "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) @@ -494,13 +497,33 @@ class FederationServer(FederationBase): def _handle_received_pdu(self, origin, pdu): """ Process a PDU received in a federation /send/ transaction. + If the event is invalid, then this method throws a FederationError. + (The error will then be logged and sent back to the sender (which + probably won't do anything with it), and other events in the + transaction will be processed as normal). + + It is likely that we'll then receive other events which refer to + this rejected_event in their prev_events, etc. When that happens, + we'll attempt to fetch the rejected event again, which will presumably + fail, so those second-generation events will also get rejected. + + Eventually, we get to the point where there are more than 10 events + between any new events and the original rejected event. Since we + only try to backfill 10 events deep on received pdu, we then accept the + new event, possibly introducing a discontinuity in the DAG, with new + forward extremities, so normal service is approximately returned, + until we try to backfill across the discontinuity. + Args: origin (str): server which sent the pdu pdu (FrozenEvent): received pdu Returns (Deferred): completes with None - Raises: FederationError if the signatures / hash do not match - """ + + Raises: FederationError if the signatures / hash do not match, or + if the event was unacceptable for any other reason (eg, too large, + too many prev_events, couldn't find the prev_events) + """ # check that it's actually being sent from a valid destination to # workaround bug #1753 in 0.18.5 and 0.18.6 if origin != get_domain_from_id(pdu.event_id): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 945832283f..0f0c687b37 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -35,11 +35,13 @@ from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics -from sortedcontainers import SortedDict +from blist import sorteddict from collections import namedtuple import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -56,19 +58,19 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = SortedDict() # Stream position -> user_id + self.presence_changed = sorteddict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) + self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) - self.edus = SortedDict() # stream position -> Edu + self.edus = sorteddict() # stream position -> Edu - self.failures = SortedDict() # stream position -> (destination, Failure) + self.failures = sorteddict() # stream position -> (destination, Failure) - self.device_messages = SortedDict() # stream position -> destination + self.device_messages = sorteddict() # stream position -> destination self.pos = 1 - self.pos_time = SortedDict() + self.pos_time = sorteddict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner @@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object): user_ids = set( user_id - for uids in self.presence_changed.itervalues() + for uids in itervalues(self.presence_changed) for user_id in uids ) @@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object): # stream position. keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} - for ((destination, edu_key), pos) in keyed_edus.iteritems(): + for ((destination, edu_key), pos) in iteritems(keyed_edus): rows.append((pos, KeyedEduRow( key=edu_key, edu=self.keyed_edu[(destination, edu_key)], @@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object): j = keys.bisect_right(to_token) + 1 device_messages = {self.device_messages[k]: k for k in keys[i:j]} - for (destination, pos) in device_messages.iteritems(): + for (destination, pos) in iteritems(device_messages): rows.append((pos, DeviceRow( destination=destination, ))) @@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows): if buff.presence: transaction_queue.send_presence(buff.presence) - for destination, edu_map in buff.keyed_edus.iteritems(): + for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=key, ) - for destination, edu_list in buff.edus.iteritems(): + for destination, edu_list in iteritems(buff.edus): for edu in edu_list: transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=None, ) - for destination, failure_list in buff.failures.iteritems(): + for destination, failure_list in iteritems(buff.failures): for failure in failure_list: transaction_queue.send_failure(destination, failure) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 4c94d5a36c..ff0656df3e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -94,12 +94,6 @@ class Authenticator(object): "signatures": {}, } - if ( - self.federation_domain_whitelist is not None and - self.server_name not in self.federation_domain_whitelist - ): - raise FederationDeniedError(self.server_name) - if content is not None: json_request["content"] = content @@ -138,6 +132,12 @@ class Authenticator(object): json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig + if ( + self.federation_domain_whitelist is not None and + origin not in self.federation_domain_whitelist + ): + raise FederationDeniedError(origin) + if not json_request["signatures"]: raise NoAuthenticationError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d71..ae7e0d6da2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -15,8 +15,14 @@ # limitations under the License. """Contains handlers for federation events.""" + +import httplib +import itertools +import logging + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json +from twisted.internet import defer from unpaddedbase64 import decode_base64 from ._base import BaseHandler @@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.distributor import user_joined_room -from twisted.internet import defer - -import itertools -import logging logger = logging.getLogger(__name__) @@ -115,6 +117,19 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # do some initial sanity-checking of the event. In particular, make + # sure it doesn't have hundreds of prev_events or auth_events, which + # could cause a huge state resolution or cascade of event fetches. + try: + self._sanity_check_event(pdu) + except SynapseError as err: + raise FederationError( + "ERROR", + err.code, + err.msg, + affected=pdu.event_id, + ) + # If we are currently in the process of joining this room, then we # queue up events for later processing. if pdu.room_id in self.room_queues: @@ -149,10 +164,6 @@ class FederationHandler(BaseHandler): auth_chain = [] - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - fetch_state = False # Get missing pdus if necessary. @@ -168,7 +179,7 @@ class FederationHandler(BaseHandler): ) prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this @@ -196,8 +207,7 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: logger.info( @@ -248,8 +258,7 @@ class FederationHandler(BaseHandler): min_depth (int): Minimum depth of events to return. """ # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return @@ -361,9 +370,7 @@ class FederationHandler(BaseHandler): if auth_chain: event_ids |= {e.event_id for e in auth_chain} - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + seen_ids = yield self.store.have_seen_events(event_ids) if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication @@ -527,9 +534,16 @@ class FederationHandler(BaseHandler): def backfill(self, dest, room_id, limit, extremities): """ Trigger a backfill request to `dest` for the given `room_id` - This will attempt to get more events from the remote. This may return - be successfull and still return no events if the other side has no new - events to offer. + This will attempt to get more events from the remote. If the other side + has no new events to offer, this will return an empty list. + + As the events are received, we check their signatures, and also do some + sanity-checking on them. If any of the backfilled events are invalid, + this method throws a SynapseError. + + TODO: make this more useful to distinguish failures of the remote + server from invalid events (there is probably no point in trying to + re-fetch invalid events from every other HS in the room.) """ if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") @@ -541,6 +555,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # ideally we'd sanity check the events here for excess prev_events etc, + # but it's hard to reject events at this point without completely + # breaking backfill in the same way that it is currently broken by + # events whose signature we cannot verify (#3121). + # + # So for now we accept the events anyway. #3124 tracks this. + # + # for ev in events: + # self._sanity_check_event(ev) + # Don't bother processing events we already have. seen_events = yield self.store.have_events_in_timeline( set(e.event_id for e in events) @@ -633,7 +657,7 @@ class FederationHandler(BaseHandler): failed_to_fetch = missing_auth - set(auth_events) - seen_events = yield self.store.have_events( + seen_events = yield self.store.have_seen_events( set(auth_events.keys()) | set(state_events.keys()) ) @@ -843,6 +867,38 @@ class FederationHandler(BaseHandler): defer.returnValue(False) + def _sanity_check_event(self, ev): + """ + Do some early sanity checks of a received event + + In particular, checks it doesn't have an excessive number of + prev_events or auth_events, which could cause a huge state resolution + or cascade of event fetches. + + Args: + ev (synapse.events.EventBase): event to be checked + + Returns: None + + Raises: + SynapseError if the event does not pass muster + """ + if len(ev.prev_events) > 20: + logger.warn("Rejecting event %s which has %i prev_events", + ev.event_id, len(ev.prev_events)) + raise SynapseError( + httplib.BAD_REQUEST, + "Too many prev_events", + ) + + if len(ev.auth_events) > 10: + logger.warn("Rejecting event %s which has %i auth_events", + ev.event_id, len(ev.auth_events)) + raise SynapseError( + httplib.BAD_REQUEST, + "Too many auth_events", + ) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -1736,7 +1792,8 @@ class FederationHandler(BaseHandler): event_key = None if event_auth_events - current_state: - have_events = yield self.store.have_events( + # TODO: can we use store.have_seen_events here instead? + have_events = yield self.store.get_seen_events_with_rejections( event_auth_events - current_state ) else: @@ -1759,12 +1816,12 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): + if e.event_id in seen_remotes: continue if e.event_id == event.event_id: @@ -1791,7 +1848,7 @@ class FederationHandler(BaseHandler): except AuthError: pass - have_events = yield self.store.have_events( + have_events = yield self.store.get_seen_events_with_rejections( [e_id for e_id, _ in event.auth_events] ) seen_events = set(have_events.keys()) @@ -1876,13 +1933,13 @@ class FederationHandler(BaseHandler): local_auth_chain, ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in result["auth_chain"]] ) # 3. Process any remote auth chain events we haven't seen. for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): + if ev.event_id in seen_remotes: continue if ev.event_id == event.event_id: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 54cd691f91..53beb2b9ab 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@ from twisted.internet import defer, reactor from twisted.python.failure import Failure -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, MAX_DEPTH from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event @@ -37,7 +37,6 @@ from ._base import BaseHandler from canonicaljson import encode_canonical_json import logging -import random import simplejson logger = logging.getLogger(__name__) @@ -433,7 +432,7 @@ class EventCreationHandler(object): @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_event_ids=None): + prev_events_and_hashes=None): """ Given a dict from a client, create a new event. @@ -447,7 +446,13 @@ class EventCreationHandler(object): event_dict (dict): An entire event token_id (str) txn_id (str) - prev_event_ids (list): The prev event ids to use when creating the event + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. Returns: Tuple of created event (FrozenEvent), Context @@ -485,7 +490,7 @@ class EventCreationHandler(object): event, context = yield self.create_new_client_event( builder=builder, requester=requester, - prev_event_ids=prev_event_ids, + prev_events_and_hashes=prev_events_and_hashes, ) defer.returnValue((event, context)) @@ -588,39 +593,48 @@ class EventCreationHandler(object): @measure_func("create_new_client_event") @defer.inlineCallbacks - def create_new_client_event(self, builder, requester=None, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) + def create_new_client_event(self, builder, requester=None, + prev_events_and_hashes=None): + """Create a new event for a local client - # We want to limit the max number of prev events we point to in our - # new event - if len(latest_ret) > 10: - # Sort by reverse depth, so we point to the most recent. - latest_ret.sort(key=lambda a: -a[2]) - new_latest_ret = latest_ret[:5] - - # We also randomly point to some of the older events, to make - # sure that we don't completely ignore the older events. - if latest_ret[5:]: - sample_size = min(5, len(latest_ret[5:])) - new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) - latest_ret = new_latest_ret - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 + Args: + builder (EventBuilder): + + requester (synapse.types.Requester|None): + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. + + Returns: + Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] + """ + + if prev_events_and_hashes is not None: + assert len(prev_events_and_hashes) <= 10, \ + "Attempting to create an event with %i prev_events" % ( + len(prev_events_and_hashes), + ) + else: + prev_events_and_hashes = \ + yield self.store.get_prev_events_for_room(builder.room_id) + + if prev_events_and_hashes: + depth = max([d for _, _, d in prev_events_and_hashes]) + 1 + # we cap depth of generated events, to ensure that they are not + # rejected by other servers (and so that they can be persisted in + # the db) + depth = min(depth, MAX_DEPTH) + else: + depth = 1 - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in prev_events_and_hashes + ] builder.prev_events = prev_events builder.depth = depth diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 8028d793c2..add3f9b009 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -20,7 +20,6 @@ from ._base import BaseHandler from synapse.api.constants import ( EventTypes, JoinRules, ) -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.async import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache @@ -78,18 +77,11 @@ class RoomListHandler(BaseHandler): ) key = (limit, since_token, network_tuple) - result = self.response_cache.get(key) - if not result: - logger.info("No cached result, calculating one.") - result = self.response_cache.set( - key, - preserve_fn(self._get_public_room_list)( - limit, since_token, network_tuple=network_tuple - ) - ) - else: - logger.info("Using cached deferred result.") - return make_deferred_yieldable(result) + return self.response_cache.wrap( + key, + self._get_public_room_list, + limit, since_token, network_tuple=network_tuple, + ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, @@ -423,18 +415,14 @@ class RoomListHandler(BaseHandler): server_name, limit, since_token, include_all_networks, third_party_instance_id, ) - result = self.remote_response_cache.get(key) - if not result: - result = self.remote_response_cache.set( - key, - repl_layer.get_public_rooms( - server_name, limit=limit, since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - ) - return result + return self.remote_response_cache.wrap( + key, + repl_layer.get_public_rooms, + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index c45142d38d..714583f1d5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -149,7 +149,7 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, - prev_event_ids, + prev_events_and_hashes, txn_id=None, ratelimit=True, content=None, @@ -175,7 +175,7 @@ class RoomMemberHandler(object): }, token_id=requester.access_token_id, txn_id=txn_id, - prev_event_ids=prev_event_ids, + prev_events_and_hashes=prev_events_and_hashes, ) # Check if this event matches the previous membership event for the user. @@ -314,7 +314,12 @@ class RoomMemberHandler(object): 403, "Invites have been disabled on this server", ) - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + prev_events_and_hashes = yield self.store.get_prev_events_for_room( + room_id, + ) + latest_event_ids = ( + event_id for (event_id, _, _) in prev_events_and_hashes + ) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, ) @@ -403,7 +408,7 @@ class RoomMemberHandler(object): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, - prev_event_ids=latest_event_ids, + prev_events_and_hashes=prev_events_and_hashes, content=content, ) defer.returnValue(res) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 06d17ab20c..b52e4c2aff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -15,7 +15,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute -from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user @@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ to tell if room needs to be part of the sync result. """ return bool(self.events) + __bool__ = __nonzero__ # python3 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ @@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ # nb the notification count does not, er, count: if there's nothing # else in the result, we don't need to send it. ) + __bool__ = __nonzero__ # python3 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ @@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ or self.state or self.account_data ) + __bool__ = __nonzero__ # python3 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ @@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ def __nonzero__(self): """Invited rooms should always be reported to the client""" return True + __bool__ = __nonzero__ # python3 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ @@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ def __nonzero__(self): return bool(self.join or self.invite or self.leave) + __bool__ = __nonzero__ # python3 class DeviceLists(collections.namedtuple("DeviceLists", [ @@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [ def __nonzero__(self): return bool(self.changed or self.left) + __bool__ = __nonzero__ # python3 class SyncResult(collections.namedtuple("SyncResult", [ @@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.device_lists or self.groups ) + __bool__ = __nonzero__ # python3 class SyncHandler(object): @@ -180,15 +187,11 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - result = self.response_cache.get(sync_config.request_key) - if not result: - result = self.response_cache.set( - sync_config.request_key, - preserve_fn(self._wait_for_sync_for_user)( - sync_config, since_token, timeout, full_state - ) - ) - return make_deferred_yieldable(result) + return self.response_cache.wrap( + sync_config.request_key, + self._wait_for_sync_for_user, + sync_config, since_token, timeout, full_state, + ) @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, diff --git a/synapse/notifier.py b/synapse/notifier.py
index ef042681bc..0e40a4aad6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -144,6 +144,7 @@ class _NotifierUserStream(object): class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): def __nonzero__(self): return bool(self.events) + __bool__ = __nonzero__ # python3 class Notifier(object): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index f9596bddaf..711cbb6c50 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py
@@ -1,5 +1,6 @@ # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,15 +19,31 @@ from distutils.version import LooseVersion logger = logging.getLogger(__name__) +# this dict maps from python package name to a list of modules we expect it to +# provide. +# +# the key is a "requirement specifier", as used as a parameter to `pip +# install`[1], or an `install_requires` argument to `setuptools.setup` [2]. +# +# the value is a sequence of strings; each entry should be the name of the +# python module, optionally followed by a version assertion which can be either +# ">=<ver>" or "==<ver>". +# +# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers. +# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies REQUIREMENTS = { "jsonschema>=2.5.1": ["jsonschema>=2.5.1"], "frozendict>=0.4": ["frozendict"], "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"], - "canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"], + "canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"], "signedjson>=1.0.0": ["signedjson>=1.0.0"], "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], - "Twisted>=16.0.0": ["twisted>=16.0.0"], + + # we break under Twisted 18.4 + # (https://github.com/matrix-org/synapse/issues/3135) + "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"], + "pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyyaml": ["yaml"], "pyasn1": ["pyasn1"], @@ -34,11 +51,12 @@ REQUIREMENTS = { "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], + "blist": ["blist"], "pysaml2>=3.0.0": ["saml2>=3.0.0"], - "sortedcontainers": ["sortedcontainers"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], + "six": ["six"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index c6a6551d24..a9baa2c1c3 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py
@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.metrics import Measure from synapse.types import Requester, UserID @@ -118,17 +117,12 @@ class ReplicationSendEventRestServlet(RestServlet): self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) def on_PUT(self, request, event_id): - result = self.response_cache.get(event_id) - if not result: - result = self.response_cache.set( - event_id, - self._handle_request(request) - ) - else: - logger.warn("Returning cached response") - return make_deferred_yieldable(result) - - @preserve_fn + return self.response_cache.wrap( + event_id, + self._handle_request, + request + ) + @defer.inlineCallbacks def _handle_request(self, request): with Measure(self.clock, "repl_send_event_parse"): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 00ee82d300..8fbf7ffba7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import random from twisted.internet import defer @@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 import logging -from Queue import PriorityQueue, Empty +from six.moves.queue import PriorityQueue, Empty + +from six.moves import range logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, front_list = list(front) chunks = [ front_list[x:x + 100] - for x in xrange(0, len(front), 100) + for x in range(0, len(front), 100) ] for chunk in chunks: txn.execute( @@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, retcol="event_id", ) + @defer.inlineCallbacks + def get_prev_events_for_room(self, room_id): + """ + Gets a subset of the current forward extremities in the given room. + + Limits the result to 10 extremities, so that we can avoid creating + events which refer to hundreds of prev_events. + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) + if len(res) > 10: + # Sort by reverse depth, so we point to the most recent. + res.sort(key=lambda a: -a[2]) + + # we use half of the limit for the actual most recent events, and + # the other half to randomly point to some of the older events, to + # make sure that we don't completely ignore the older events. + res = res[0:5] + random.sample(res[5:], 5) + + defer.returnValue(res) + def get_latest_event_ids_and_hashes_in_room(self, room_id): + """ + Gets the current forward extremities in the given room + + Args: + room_id (str): room_id + + Returns: + Deferred[list[(str, dict[str, str], int)]] + for each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + """ + return self.runInteraction( "get_latest_event_ids_and_hashes_in_room", self._get_latest_event_ids_and_hashes_in_room, @@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, room_id, ) - @defer.inlineCallbacks - def get_max_depth_of_events(self, event_ids): - sql = ( - "SELECT MAX(depth) FROM events WHERE event_id IN (%s)" - ) % (",".join(["?"] * len(event_ids)),) - - rows = yield self._execute( - "get_max_depth_of_events", None, - sql, *event_ids - ) - - if rows: - defer.returnValue(rows[0][0]) - else: - defer.returnValue(1) - def _get_min_depth_interaction(self, txn, room_id): min_depth = self._simple_select_one_onecol_txn( txn, diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index da44b52fd6..5fe4a0e56c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -16,6 +16,7 @@ from collections import OrderedDict, deque, namedtuple from functools import wraps +import itertools import logging import simplejson as json @@ -1320,13 +1321,49 @@ class EventsStore(EventsWorkerStore): defer.returnValue(set(r["event_id"] for r in rows)) - def have_events(self, event_ids): + @defer.inlineCallbacks + def have_seen_events(self, event_ids): """Given a list of event ids, check if we have already processed them. + Args: + event_ids (iterable[str]): + Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. """ if not event_ids: return defer.succeed({}) @@ -1348,9 +1385,7 @@ class EventsStore(EventsWorkerStore): return res - return self.runInteraction( - "have_events", f, - ) + return self.runInteraction("get_rejection_reasons", f) @defer.inlineCallbacks def count_daily_messages(self): diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c036975..ea6a189185 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore): # Convert the IDs to MXC URIs for media_id in local_mxcs: - local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) + local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) for hostname, media_id in remote_mxcs: remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) @@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore): while next_token: sql = """ SELECT stream_ordering, json FROM events - JOIN event_json USING (event_id) + JOIN event_json USING (room_id, event_id) WHERE room_id = ? AND stream_ordering < ? AND contains_url = ? AND outlier = ? @@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore): if matches: hostname = matches.group(1) media_id = matches.group(2) - if hostname == self.hostname: + if hostname == self.hs.hostname: local_media_mxcs.append(media_id) else: remote_media_mxcs.append((hostname, media_id)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e1a..6ba3e59889 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT stream_ordering, event_id, room_id, type, json, " " origin_server_ts FROM events" - " JOIN event_json USING (event_id)" + " JOIN event_json USING (room_id, event_id)" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 066fa423fd..7f79333e96 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py
@@ -12,9 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + +from twisted.internet import defer from synapse.util.async import ObservableDeferred from synapse.util.caches import metrics as cache_metrics +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + +logger = logging.getLogger(__name__) class ResponseCache(object): @@ -31,6 +37,7 @@ class ResponseCache(object): self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000. + self._name = name self._metrics = cache_metrics.register_cache( "response_cache", size_callback=lambda: self.size(), @@ -43,15 +50,21 @@ class ResponseCache(object): def get(self, key): """Look up the given key. - Returns a deferred which doesn't follow the synapse logcontext rules, - so you'll probably want to make_deferred_yieldable it. + Can return either a new Deferred (which also doesn't follow the synapse + logcontext rules), or, if the request has completed, the actual + result. You will probably want to make_deferred_yieldable the result. + + If there is no entry for the key, returns None. It is worth noting that + this means there is no way to distinguish a completed result of None + from an absent cache entry. Args: - key (str): + key (hashable): Returns: - twisted.internet.defer.Deferred|None: None if there is no entry - for this key; otherwise a deferred result. + twisted.internet.defer.Deferred|None|E: None if there is no entry + for this key; otherwise either a deferred result or the result + itself. """ result = self.pending_result_cache.get(key) if result is not None: @@ -68,19 +81,17 @@ class ResponseCache(object): you should wrap normal synapse deferreds with logcontext.run_in_background). - Returns a new Deferred which also doesn't follow the synapse logcontext - rules, so you will want to make_deferred_yieldable it - - (TODO: before using this more widely, it might make sense to refactor - it and get() so that they do the necessary wrapping rather than having - to do it everywhere ResponseCache is used.) + Can return either a new Deferred (which also doesn't follow the synapse + logcontext rules), or, if *deferred* was already complete, the actual + result. You will probably want to make_deferred_yieldable the result. Args: - key (str): - deferred (twisted.internet.defer.Deferred): + key (hashable): + deferred (twisted.internet.defer.Deferred[T): Returns: - twisted.internet.defer.Deferred + twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual + result. """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result @@ -97,3 +108,52 @@ class ResponseCache(object): result.addBoth(remove) return result.observe() + + def wrap(self, key, callback, *args, **kwargs): + """Wrap together a *get* and *set* call, taking care of logcontexts + + First looks up the key in the cache, and if it is present makes it + follow the synapse logcontext rules and returns it. + + Otherwise, makes a call to *callback(*args, **kwargs)*, which should + follow the synapse logcontext rules, and adds the result to the cache. + + Example usage: + + @defer.inlineCallbacks + def handle_request(request): + # etc + defer.returnValue(result) + + result = yield response_cache.wrap( + key, + handle_request, + request, + ) + + Args: + key (hashable): key to get/set in the cache + + callback (callable): function to call if the key is not found in + the cache + + *args: positional parameters to pass to the callback, if it is used + + **kwargs: named paramters to pass to the callback, if it is used + + Returns: + twisted.internet.defer.Deferred: yieldable result + """ + result = self.get(key) + if not result: + logger.info("[%s]: no cached result for [%s], calculating new one", + self._name, key) + d = run_in_background(callback, *args, **kwargs) + result = self.set(key, d) + elif not isinstance(result, defer.Deferred) or result.called: + logger.info("[%s]: using completed cached result for [%s]", + self._name, key) + else: + logger.info("[%s]: using incomplete cached result for [%s]", + self._name, key) + return make_deferred_yieldable(result) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2ff46090a6..941d873ab8 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py
@@ -16,7 +16,7 @@ from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR -from sortedcontainers import SortedDict +from blist import sorteddict import logging @@ -35,7 +35,7 @@ class StreamChangeCache(object): def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): self._max_size = int(max_size * CACHE_SIZE_FACTOR) self._entity_to_key = {} - self._cache = SortedDict() + self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name self.metrics = register_cache(self.name, self._cache) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 90a2608d6f..3c8a165331 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py
@@ -17,7 +17,7 @@ from twisted.internet import threads, reactor from synapse.util.logcontext import make_deferred_yieldable, preserve_fn -import Queue +from six.moves import queue class BackgroundFileConsumer(object): @@ -49,7 +49,7 @@ class BackgroundFileConsumer(object): # Queue of slices of bytes to be written. When producer calls # unregister a final None is sent. - self._bytes_queue = Queue.Queue() + self._bytes_queue = queue.Queue() # Deferred that is resolved when finished writing self._finished_deferred = None diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d660ec785b..d59adc236e 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py
@@ -92,6 +92,7 @@ class LoggingContext(object): def __nonzero__(self): return False + __bool__ = __nonzero__ # python3 sentinel = Sentinel()