diff options
author | Erik Johnston <erik@matrix.org> | 2018-04-12 13:35:56 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-04-12 13:35:56 +0100 |
commit | dd7f1b10cece222db5b57d92727afaf7780f8b79 (patch) | |
tree | e9499a2b7cd19f5efab7d7268461e89a5ad2d19a | |
parent | Merge branch 'develop' of https://github.com/matrix-org/synapse into matrix-o... (diff) | |
parent | Merge pull request #3092 from matrix-org/rav/response_cache_metrics (diff) | |
download | synapse-dd7f1b10cece222db5b57d92727afaf7780f8b79.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
-rw-r--r-- | synapse/appservice/api.py | 3 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 2 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 14 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 48 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 37 | ||||
-rw-r--r-- | synapse/handlers/message.py | 104 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 5 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 3 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 43 | ||||
-rw-r--r-- | synapse/metrics/metric.py | 32 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 2 | ||||
-rw-r--r-- | synapse/replication/http/send_event.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 7 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/engines/__init__.py | 5 | ||||
-rw-r--r-- | synapse/storage/events.py | 3 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 20 | ||||
-rw-r--r-- | synapse/types.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 46 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 4 | ||||
-rw-r--r-- | tests/handlers/test_appservice.py | 1 |
22 files changed, 301 insertions, 88 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 40c433d7ae..11e9c37c63 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -73,7 +73,8 @@ class ApplicationServiceApi(SimpleHttpClient): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() - self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS) + self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta", + timeout_ms=HOUR_IN_MS) @defer.inlineCallbacks def query_user(self, service, user_id): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bea7fd0b71..e4ce037acf 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -65,7 +65,7 @@ class FederationServer(FederationBase): # We cache responses to state queries, as they take a while and often # come in waves. - self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) @defer.inlineCallbacks @log_function diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 93e5acebc1..945832283f 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics -from blist import sorteddict +from sortedcontainers import SortedDict from collections import namedtuple import logging @@ -56,19 +56,19 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = sorteddict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) + self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) - self.edus = sorteddict() # stream position -> Edu + self.edus = SortedDict() # stream position -> Edu - self.failures = sorteddict() # stream position -> (destination, Failure) + self.failures = SortedDict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() # stream position -> destination + self.device_messages = SortedDict() # stream position -> destination self.pos = 1 - self.pos_time = sorteddict() + self.pos_time = SortedDict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 6da60718a9..965c1d2fda 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -169,7 +169,7 @@ class TransactionQueue(object): while True: last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( - last_token, self._last_poked_id, limit=20, + last_token, self._last_poked_id, limit=100, ) logger.debug("Handling %s -> %s", last_token, next_token) @@ -177,12 +177,13 @@ class TransactionQueue(object): if not events and next_token >= self._last_poked_id: break - for event in events: + @defer.inlineCallbacks + def handle_event(event): # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.event_id) if not is_mine and send_on_behalf_of is None: - continue + return try: # Get the state from before the event. @@ -197,8 +198,11 @@ class TransactionQueue(object): ], ) except Exception: - logger.exception("Failed to calculate hosts in room") - continue + logger.exception( + "Failed to calculate hosts in room for event: %s", + event.event_id, + ) + return destinations = set(destinations) @@ -212,12 +216,44 @@ class TransactionQueue(object): self._send_pdu(event, destinations) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + events_by_room = {} + for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], + consumeErrors=True + )) yield self.store.update_federation_out_pos( "events", next_token ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..0245197c02 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -18,7 +18,9 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, run_in_background, +) import logging @@ -84,11 +86,16 @@ class ApplicationServicesHandler(object): if not events: break + events_by_room = {} for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + @defer.inlineCallbacks + def handle_event(event): # Gather interested services services = yield self._get_services_for_event(event) if len(services) == 0: - continue # no services need notifying + return # no services need notifying # Do we know this user exists? If not, poke the user # query API for all services which match that user regex. @@ -108,9 +115,33 @@ class ApplicationServicesHandler(object): service, event ) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + yield make_deferred_yieldable(defer.gatherResults([ + run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2134741719..726a3932da 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -454,40 +454,39 @@ class EventCreationHandler(object): """ builder = self.event_builder_factory.new(event_dict) - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if token_id is not None: - builder.internal_metadata.token_id = token_id + if token_id is not None: + builder.internal_metadata.token_id = token_id - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id - event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) + event, context = yield self.create_new_client_event( + builder=builder, + requester=requester, + prev_event_ids=prev_event_ids, + ) defer.returnValue((event, context)) @@ -557,27 +556,34 @@ class EventCreationHandler(object): See self.create_event and self.send_nonmember_event. """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN + # We limit the number of concurrent event sends in a room so that we + # don't fork the DAG too much. If we don't limit then we can end up in + # a situation where event persistence can't keep up, causing + # extremities to pile up, which in turn leads to state resolution + # taking longer. + with (yield self.limiter.queue(event_dict["room_id"])): + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id ) - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, basestring): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) defer.returnValue(event) @measure_func("create_new_client_event") diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index f5657a3521..ef3a3b352a 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -44,8 +44,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000) - self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) + self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000) + self.remote_response_cache = ResponseCache(hs, "remote_room_list", + timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 37fae43f71..da234c741f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -171,7 +171,8 @@ class SyncHandler(object): self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() self.response_cache = ResponseCache( - hs, timeout_ms=SYNC_RESPONSE_CACHE_MS, + hs, "sync", + timeout_ms=SYNC_RESPONSE_CACHE_MS, ) self.state = hs.get_state_handler() diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 50d99d7a5c..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -17,12 +17,13 @@ import logging import functools import time import gc +import platform from twisted.internet import reactor from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, + MemoryUsageMetric, GaugeMetric, ) from .process_collector import register_process_collector @@ -30,6 +31,7 @@ from .process_collector import register_process_collector logger = logging.getLogger(__name__) +running_on_pypy = platform.python_implementation() == 'PyPy' all_metrics = [] all_collectors = [] @@ -63,6 +65,13 @@ class Metrics(object): """ return self._register(CounterMetric, *args, **kwargs) + def register_gauge(self, *args, **kwargs): + """ + Returns: + GaugeMetric + """ + return self._register(GaugeMetric, *args, **kwargs) + def register_callback(self, *args, **kwargs): """ Returns: @@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +synapse_metrics = get_metrics_for("synapse") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = synapse_metrics.register_gauge( + "event_processing_positions", labels=["name"], +) + +# Used to track the current max events stream position +event_persisted_position = synapse_metrics.register_gauge( + "event_persisted_position", +) + +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): @@ -174,6 +209,9 @@ def runUntilCurrentTimer(func): tick_time.inc_by(end - start) pending_calls_metric.inc_by(num_pending) + if running_on_pypy: + return ret + # Check if we need to do a manual GC (since its been disabled), and do # one if necessary. threshold = gc.get_threshold() @@ -206,6 +244,7 @@ try: # We manually run the GC each reactor tick so that we can get some metrics # about time spent doing GC, - gc.disable() + if not running_on_pypy: + gc.disable() except AttributeError: pass diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index ff5aa8c0e1..89bd47c3f7 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -115,7 +115,7 @@ class CounterMetric(BaseMetric): # dict[list[str]]: value for each set of label values. the keys are the # label values, in the same order as the labels in self.labels. # - # (if the metric is a scalar, the (single) key is the empty list). + # (if the metric is a scalar, the (single) key is the empty tuple). self.counts = {} # Scalar metrics are never empty @@ -145,6 +145,36 @@ class CounterMetric(BaseMetric): ) +class GaugeMetric(BaseMetric): + """A metric that can go up or down + """ + + def __init__(self, *args, **kwargs): + super(GaugeMetric, self).__init__(*args, **kwargs) + + # dict[list[str]]: value for each set of label values. the keys are the + # label values, in the same order as the labels in self.labels. + # + # (if the metric is a scalar, the (single) key is the empty tuple). + self.guages = {} + + def set(self, v, *values): + if len(values) != self.dimension(): + raise ValueError( + "Expected as many values to inc() as labels (%d)" % (self.dimension()) + ) + + # TODO: should assert that the tag values are all strings + + self.guages[values] = v + + def render(self): + return flatten( + self._render_for_labels(k, self.guages[k]) + for k in sorted(self.guages.keys()) + ) + + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever it is rendered. Typically this is used to implement gauges that yield the diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 40eedb63cb..f9596bddaf 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -34,8 +34,8 @@ REQUIREMENTS = { "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], - "blist": ["blist"], "pysaml2>=3.0.0": ["saml2>=3.0.0"], + "sortedcontainers": ["sortedcontainers"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index bbe2f967b7..c6a6551d24 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -115,7 +115,7 @@ class ReplicationSendEventRestServlet(RestServlet): self.clock = hs.get_clock() # The responses are tiny, so we may as well cache them for a while - self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000) + self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) def on_PUT(self, request, event_id): result = self.response_cache.get(event_id) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index d06cbdc35e..2ad0e5943b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet): content=content, ) else: - event, context = yield self.event_creation_hander.create_event( + event = yield self.event_creation_hander.create_and_send_nonmember_event( requester, event_dict, - token_id=requester.access_token_id, txn_id=txn_id, ) - yield self.event_creation_hander.send_nonmember_event( - requester, event, context, - ) - ret = {} if event: ret = {"event_id": event.event_id} diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eacd49d6a5..8cdfd50f90 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore, def count_r30_users(self): """ Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days + * Users who have created their accounts more than 30 days ago * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days + * Where account creation and last_seen are > 30 days apart Returns counts globaly for a given user as well as breaking by platform diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2fbebd4907..2262776ab2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -376,7 +376,7 @@ class SQLBaseStore(object): Returns: A list of dicts where the key is the column header. """ - col_headers = list(intern(column[0]) for column in cursor.description) + col_headers = list(intern(str(column[0])) for column in cursor.description) results = list( dict(zip(col_headers, row)) for row in cursor ) diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 338b495611..8c868ece75 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -18,6 +18,7 @@ from .postgres import PostgresEngine from .sqlite3 import Sqlite3Engine import importlib +import platform SUPPORTED_MODULE = { @@ -31,6 +32,10 @@ def create_engine(database_config): engine_class = SUPPORTED_MODULE.get(name, None) if engine_class: + # pypy requires psycopg2cffi rather than psycopg2 + if (name == "psycopg2" and + platform.python_implementation() == "PyPy"): + name = "psycopg2cffi" module = importlib.import_module(name) return engine_class(module, database_config) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d1ef3ea46..99557edc33 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..a937b9bceb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event. + + Raises an exception for unknown events. + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timestamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, diff --git a/synapse/types.py b/synapse/types.py index 7cb24cecb2..cc7c182a78 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -169,7 +169,7 @@ class DomainSpecificString( except Exception: return False - __str__ = to_string + __repr__ = to_string class UserID(DomainSpecificString): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 00af539880..066fa423fd 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.util.async import ObservableDeferred +from synapse.util.caches import metrics as cache_metrics class ResponseCache(object): @@ -24,20 +25,63 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self, hs, timeout_ms=0): + def __init__(self, hs, name, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000. + self._metrics = cache_metrics.register_cache( + "response_cache", + size_callback=lambda: self.size(), + cache_name=name, + ) + + def size(self): + return len(self.pending_result_cache) + def get(self, key): + """Look up the given key. + + Returns a deferred which doesn't follow the synapse logcontext rules, + so you'll probably want to make_deferred_yieldable it. + + Args: + key (str): + + Returns: + twisted.internet.defer.Deferred|None: None if there is no entry + for this key; otherwise a deferred result. + """ result = self.pending_result_cache.get(key) if result is not None: + self._metrics.inc_hits() return result.observe() else: + self._metrics.inc_misses() return None def set(self, key, deferred): + """Set the entry for the given key to the given deferred. + + *deferred* should run its callbacks in the sentinel logcontext (ie, + you should wrap normal synapse deferreds with + logcontext.run_in_background). + + Returns a new Deferred which also doesn't follow the synapse logcontext + rules, so you will want to make_deferred_yieldable it + + (TODO: before using this more widely, it might make sense to refactor + it and get() so that they do the necessary wrapping rather than having + to do it everywhere ResponseCache is used.) + + Args: + key (str): + deferred (twisted.internet.defer.Deferred): + + Returns: + twisted.internet.defer.Deferred + """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 941d873ab8..2ff46090a6 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -16,7 +16,7 @@ from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR -from blist import sorteddict +from sortedcontainers import SortedDict import logging @@ -35,7 +35,7 @@ class StreamChangeCache(object): def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): self._max_size = int(max_size * CACHE_SIZE_FACTOR) self._entity_to_key = {} - self._cache = sorteddict() + self._cache = SortedDict() self._earliest_known_stream_pos = current_stream_pos self.name = name self.metrics = register_cache(self.name, self._cache) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a667fb6f0e..b753455943 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) + self.mock_store.get_received_ts.return_value = 0 hs.get_application_service_api = Mock(return_value=self.mock_as_api) hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) hs.get_clock.return_value = MockClock() |