diff options
Diffstat (limited to 'synapse')
-rwxr-xr-x | synapse/app/synctl.py | 4 | ||||
-rw-r--r-- | synapse/config/consent_config.py | 3 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 65 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 9 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 4 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 26 | ||||
-rw-r--r-- | synapse/storage/state.py | 4 | ||||
-rw-r--r-- | synapse/util/caches/__init__.py | 10 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 4 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 57 |
11 files changed, 92 insertions, 97 deletions
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 712dfa870e..56ae086128 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -171,6 +171,10 @@ def main(): if cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) + cache_factors = config.get("synctl_cache_factors", {}) + for cache_name, factor in cache_factors.iteritems(): + os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) + worker_configfiles = [] if options.worker: start_stop_synapse = False diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index 8f6ed73328..e22c731aad 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -18,6 +18,9 @@ from ._base import Config DEFAULT_CONFIG = """\ # User Consent configuration # +# for detailed instructions, see +# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md +# # Parts of this section are required if enabling the 'consent' resource under # 'listeners', in particular 'template_dir' and 'version'. # diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 3dcc629d44..1d5c0f3797 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure from synapse.metrics import LaterGauge -from blist import sorteddict +from sortedcontainers import SortedDict from collections import namedtuple import logging @@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = sorteddict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) + self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) - self.edus = sorteddict() # stream position -> Edu + self.edus = SortedDict() # stream position -> Edu - self.failures = sorteddict() # stream position -> (destination, Failure) + self.failures = SortedDict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() # stream position -> destination + self.device_messages = SortedDict() # stream position -> destination self.pos = 1 - self.pos_time = sorteddict() + self.pos_time = SortedDict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner @@ -75,7 +75,7 @@ class FederationRemoteSendQueue(object): # changes. ARGH. def register(name, queue): LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), - "", lambda: len(queue)) + "", [], lambda: len(queue)) for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", @@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object): now = self.clock.time_msec() keys = self.pos_time.keys() - time = keys.bisect_left(now - FIVE_MINUTES_AGO) + time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO) if not keys[:time]: return @@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object): with Measure(self.clock, "send_queue._clear"): # Delete things out of presence maps keys = self.presence_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.presence_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.presence_changed[key] @@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object): # Delete things out of keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.keyed_edu_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.keyed_edu_changed[key] @@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object): # Delete things out of edu map keys = self.edus.keys() - i = keys.bisect_left(position_to_delete) + i = self.edus.bisect_left(position_to_delete) for key in keys[:i]: del self.edus[key] # Delete things out of failure map keys = self.failures.keys() - i = keys.bisect_left(position_to_delete) + i = self.failures.bisect_left(position_to_delete) for key in keys[:i]: del self.failures[key] # Delete things out of device map keys = self.device_messages.keys() - i = keys.bisect_left(position_to_delete) + i = self.device_messages.bisect_left(position_to_delete) for key in keys[:i]: del self.device_messages[key] @@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(federation_ack) # Fetch changed presence - keys = self.presence_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.presence_changed.bisect_right(from_token) + j = self.presence_changed.bisect_right(to_token) + 1 dest_user_ids = [ (pos, user_id) - for pos in keys[i:j] - for user_id in self.presence_changed[pos] + for pos, user_id_list in self.presence_changed.items()[i:j] + for user_id in user_id_list ] for (key, user_id) in dest_user_ids: @@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object): ))) # Fetch changes keyed edus - keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.keyed_edu_changed.bisect_right(from_token) + j = self.keyed_edu_changed.bisect_right(to_token) + 1 # We purposefully clobber based on the key here, python dict comprehensions # always use the last value, so this will correctly point to the last # stream position. - keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} + keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} for ((destination, edu_key), pos) in iteritems(keyed_edus): rows.append((pos, KeyedEduRow( @@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object): ))) # Fetch changed edus - keys = self.edus.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - edus = ((k, self.edus[k]) for k in keys[i:j]) + i = self.edus.bisect_right(from_token) + j = self.edus.bisect_right(to_token) + 1 + edus = self.edus.items()[i:j] for (pos, edu) in edus: rows.append((pos, EduRow(edu))) # Fetch changed failures - keys = self.failures.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - failures = ((k, self.failures[k]) for k in keys[i:j]) + i = self.failures.bisect_right(from_token) + j = self.failures.bisect_right(to_token) + 1 + failures = self.failures.items()[i:j] for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object): ))) # Fetch changed device messages - keys = self.device_messages.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - device_messages = {self.device_messages[k]: k for k in keys[i:j]} + i = self.device_messages.bisect_right(from_token) + j = self.device_messages.bisect_right(to_token) + 1 + device_messages = {v: k for k, v in self.device_messages.items()[i:j]} for (destination, pos) in iteritems(device_messages): rows.append((pos, DeviceRow( diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 56c0032f91..429e79c472 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -60,10 +60,13 @@ class LaterGauge(object): try: calls = self.caller() - except Exception as e: - print(e) - logger.err() + except Exception: + logger.exception( + "Exception running callback for LaterGuage(%s)", + self.name, + ) yield g + return if isinstance(calls, dict): for k, v in calls.items(): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 478c497722..001c798fe3 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -50,7 +50,7 @@ REQUIREMENTS = { "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], - "blist": ["blist"], + "sortedcontainers": ["sortedcontainers"], "pysaml2>=3.0.0": ["saml2>=3.0.0"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], @@ -58,6 +58,7 @@ REQUIREMENTS = { "six": ["six"], "prometheus_client": ["prometheus_client"], } + CONDITIONAL_REQUIREMENTS = { "web_client": { "matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"], diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index a6280aae70..c870475cd1 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -622,7 +622,7 @@ tcp_inbound_commands = LaterGauge( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in iteritems(p.inbound_commands_counter.counts) + for k, count in iteritems(p.inbound_commands_counter) }) tcp_outbound_commands = LaterGauge( @@ -630,7 +630,7 @@ tcp_outbound_commands = LaterGauge( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in iteritems(p.outbound_commands_counter.counts) + for k, count in iteritems(p.outbound_commands_counter) }) # number of updates received for each RDATA stream diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7bfc3d91b5..48a88f755e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore): ) txn.execute(sql, (user_id, room_id)) - txn.call_after(self.was_forgotten_at.invalidate_all) txn.call_after(self.did_forget.invalidate, (user_id, room_id)) self._invalidate_cache_and_stream( txn, self.who_forgot_in_room, (room_id,) @@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore): count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) - @cachedInlineCallbacks(num_args=3) - def was_forgotten_at(self, user_id, room_id, event_id): - """Returns whether user_id has elected to discard history for room_id at - event_id. - - event_id must be a membership event.""" - def f(txn): - sql = ( - "SELECT" - " forgotten" - " FROM" - " room_memberships" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - " AND" - " event_id = ?" - ) - txn.execute(sql, (user_id, room_id, event_id)) - rows = txn.fetchall() - return rows[0][0] - forgot = yield self.runInteraction("did_forget_membership_at", f) - defer.returnValue(forgot == 1) - @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index bdee14a8eb..c11bc52177 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -23,7 +23,7 @@ from twisted.internet import defer from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine -from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR +from synapse.util.caches import intern_string, get_cache_factor_for from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.stringutils import to_ascii @@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore): super(StateGroupWorkerStore, self).__init__(db_conn, hs) self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR + "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache") ) @cached(max_entries=100000, iterable=True) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 183faf75a1..900575eb3c 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -22,6 +22,16 @@ import six CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) + +def get_cache_factor_for(cache_name): + env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper() + factor = os.environ.get(env_var) + if factor: + return float(factor) + + return CACHE_SIZE_FACTOR + + caches_by_name = {} collectors_by_name = {} diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index fc1874b65b..65a1042de1 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -17,7 +17,7 @@ import logging from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError, logcontext -from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.caches import get_cache_factor_for from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry from synapse.util.stringutils import to_ascii @@ -313,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase): orig, num_args=num_args, inlineCallbacks=inlineCallbacks, cache_context=cache_context) - max_entries = int(max_entries * CACHE_SIZE_FACTOR) + max_entries = int(max_entries * get_cache_factor_for(orig.__name__)) self.max_entries = max_entries self.tree = tree diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index a7fe0397fa..817118e30f 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR +from synapse.util import caches -from blist import sorteddict +from sortedcontainers import SortedDict import logging @@ -32,16 +32,18 @@ class StreamChangeCache(object): entities that may have changed since that position. If position key is too old then the cache will simply return all given entities. """ - def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): - self._max_size = int(max_size * CACHE_SIZE_FACTOR) + + def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None): + self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR) self._entity_to_key = {} - self._cache = sorteddict() + self._cache = SortedDict() self._earliest_known_stream_pos = current_stream_pos self.name = name - self.metrics = register_cache("cache", self.name, self._cache) + self.metrics = caches.register_cache("cache", self.name, self._cache) - for entity, stream_pos in prefilled_cache.items(): - self.entity_has_changed(entity, stream_pos) + if prefilled_cache: + for entity, stream_pos in prefilled_cache.items(): + self.entity_has_changed(entity, stream_pos) def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos @@ -65,22 +67,25 @@ class StreamChangeCache(object): return False def get_entities_changed(self, entities, stream_pos): - """Returns subset of entities that have had new things since the - given position. If the position is too old it will just return the given list. + """ + Returns subset of entities that have had new things since the given + position. Entities unknown to the cache will be returned. If the + position is too old it will just return the given list. """ assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) + not_known_entities = set(entities) - set(self._entity_to_key) - result = set( - self._cache[k] for k in keys[i:] - ).intersection(entities) + result = ( + set(self._cache.values()[self._cache.bisect_right(stream_pos) :]) + .intersection(entities) + .union(not_known_entities) + ) self.metrics.inc_hits() else: - result = entities + result = set(entities) self.metrics.inc_misses() return result @@ -90,12 +95,13 @@ class StreamChangeCache(object): """ assert type(stream_pos) is int + if not self._cache: + # If we have no cache, nothing can have changed. + return False + if stream_pos >= self._earliest_known_stream_pos: self.metrics.inc_hits() - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) - - return i < len(keys) + return self._cache.bisect_right(stream_pos) < len(self._cache) else: self.metrics.inc_misses() return True @@ -107,10 +113,7 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) - - return [self._cache[k] for k in keys[i:]] + return self._cache.values()[self._cache.bisect_right(stream_pos) :] else: return None @@ -129,8 +132,10 @@ class StreamChangeCache(object): self._entity_to_key[entity] = stream_pos while len(self._cache) > self._max_size: - k, r = self._cache.popitem() - self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + k, r = self._cache.popitem(0) + self._earliest_known_stream_pos = max( + k, self._earliest_known_stream_pos, + ) self._entity_to_key.pop(r, None) def get_max_pos_of_last_change(self, entity): |