diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | CHANGES.rst | 58 | ||||
-rw-r--r-- | docs/metrics-howto.rst | 50 | ||||
-rw-r--r-- | docs/postgres.rst | 18 | ||||
-rw-r--r-- | setup.cfg | 3 | ||||
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/appservice/__init__.py | 6 | ||||
-rw-r--r-- | synapse/config/consent_config.py | 3 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 63 | ||||
-rw-r--r-- | synapse/http/__init__.py | 13 | ||||
-rw-r--r-- | synapse/http/client.py | 9 | ||||
-rw-r--r-- | synapse/http/site.py | 9 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 4 | ||||
-rw-r--r-- | synapse/rest/client/v1/admin.py | 12 | ||||
-rw-r--r-- | synapse/storage/state.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 57 | ||||
-rw-r--r-- | tests/util/test_stream_change_cache.py | 198 | ||||
-rw-r--r-- | tox.ini | 30 |
18 files changed, 415 insertions, 123 deletions
diff --git a/.gitignore b/.gitignore index 7940158c5b..9f42a7568f 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ media_store/ build/ venv/ +venv*/ localhost-800*/ static/client/register/register_config.js diff --git a/CHANGES.rst b/CHANGES.rst index 0569b581db..f2b7f04097 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,61 @@ +Changes in synapse v0.31.0 (2018-06-06) +======================================= + +Most notable change from v0.30.0 is to switch to python prometheus library to improve system +stats reporting. WARNING this changes a number of prometheus metrics in a +backwards-incompatible manner. For more details, see +`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_. + +Bug Fixes: + +* Fix metric documentation tables (PR #3341) +* Fix LaterGuage error handling (694968f) +* Fix replication metrics (b7e7fd2) + +Changes in synapse v0.31.0-rc1 (2018-06-04) +========================================== + +Features: + +* Switch to the Python Prometheus library (PR #3256, #3274) +* Let users leave the server notice room after joining (PR #3287) + + +Changes: + +* daily user type phone home stats (PR #3264) +* Use iter* methods for _filter_events_for_server (PR #3267) +* Docs on consent bits (PR #3268) +* Remove users from user directory on deactivate (PR #3277) +* Avoid sending consent notice to guest users (PR #3288) +* disable CPUMetrics if no /proc/self/stat (PR #3299) +* Add local and loopback IPv6 addresses to url_preview_ip_range_blacklist (PR #3312) Thanks to @thegcat! +* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307) +* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat! +* Reduce stuck read-receipts: ignore depth when updating (PR #3318) +* Put python's logs into Trial when running unit tests (PR #3319) + +Changes, python 3 migration: + +* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile! +* replace some iteritems with six (PR #3244) Thanks to @NotAFile! +* Add batch_iter to utils (PR #3245) Thanks to @NotAFile! +* use repr, not str (PR #3246) Thanks to @NotAFile! +* Misc Python3 fixes (PR #3247) Thanks to @NotAFile! +* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile! +* more six iteritems (PR #3279) Thanks to @NotAFile! +* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile! +* remaining isintance fixes (PR #3281) Thanks to @NotAFile! +* py3-ize state.py (PR #3283) Thanks to @NotAFile! +* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel! +* use memoryview in py3 (PR #3303) Thanks to @NotAFile! + +Bugs: + +* Fix federation backfill bugs (PR #3261) +* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx! + + Changes in synapse v0.30.0 (2018-05-24) ========================================== diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst index 25e06bca58..5bbb5a4f3a 100644 --- a/docs/metrics-howto.rst +++ b/docs/metrics-howto.rst @@ -63,30 +63,40 @@ The duplicated metrics deprecated in Synapse 0.27.0 have been removed. All time duration-based metrics have been changed to be seconds. This affects: -================================ -msec -> sec metrics -================================ -python_gc_time -python_twisted_reactor_tick_time -synapse_storage_query_time -synapse_storage_schedule_time -synapse_storage_transaction_time -================================ ++----------------------------------+ +| msec -> sec metrics | ++==================================+ +| python_gc_time | ++----------------------------------+ +| python_twisted_reactor_tick_time | ++----------------------------------+ +| synapse_storage_query_time | ++----------------------------------+ +| synapse_storage_schedule_time | ++----------------------------------+ +| synapse_storage_transaction_time | ++----------------------------------+ Several metrics have been changed to be histograms, which sort entries into buckets and allow better analysis. The following metrics are now histograms: -========================================= -Altered metrics -========================================= -python_gc_time -python_twisted_reactor_pending_calls -python_twisted_reactor_tick_time -synapse_http_server_response_time_seconds -synapse_storage_query_time -synapse_storage_schedule_time -synapse_storage_transaction_time -========================================= ++-------------------------------------------+ +| Altered metrics | ++===========================================+ +| python_gc_time | ++-------------------------------------------+ +| python_twisted_reactor_pending_calls | ++-------------------------------------------+ +| python_twisted_reactor_tick_time | ++-------------------------------------------+ +| synapse_http_server_response_time_seconds | ++-------------------------------------------+ +| synapse_storage_query_time | ++-------------------------------------------+ +| synapse_storage_schedule_time | ++-------------------------------------------+ +| synapse_storage_transaction_time | ++-------------------------------------------+ Block and response metrics renamed for 0.27.0 diff --git a/docs/postgres.rst b/docs/postgres.rst index 296293e859..2377542296 100644 --- a/docs/postgres.rst +++ b/docs/postgres.rst @@ -9,19 +9,19 @@ Set up database Assuming your PostgreSQL database user is called ``postgres``, create a user ``synapse_user`` with:: - su - postgres - createuser --pwprompt synapse_user + su - postgres + createuser --pwprompt synapse_user The PostgreSQL database used *must* have the correct encoding set, otherwise it would not be able to store UTF8 strings. To create a database with the correct encoding use, e.g.:: - CREATE DATABASE synapse - ENCODING 'UTF8' - LC_COLLATE='C' - LC_CTYPE='C' - template=template0 - OWNER synapse_user; + CREATE DATABASE synapse + ENCODING 'UTF8' + LC_COLLATE='C' + LC_CTYPE='C' + template=template0 + OWNER synapse_user; This would create an appropriate database named ``synapse`` owned by the ``synapse_user`` user (which must already exist). @@ -126,7 +126,7 @@ run:: --postgres-config homeserver-postgres.yaml Once that has completed, change the synapse config to point at the PostgreSQL -database configuration file ``homeserver-postgres.yaml``: +database configuration file ``homeserver-postgres.yaml``:: ./synctl stop mv homeserver.yaml homeserver-old-sqlite.yaml diff --git a/setup.cfg b/setup.cfg index da8eafbb39..fa6f2d1ce4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -17,4 +17,5 @@ ignore = [flake8] max-line-length = 90 # W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it. -ignore = W503 +# E203 is contrary to PEP8. +ignore = W503,E203 diff --git a/synapse/__init__.py b/synapse/__init__.py index 5bada5e290..ca113db434 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.30.0" +__version__ = "0.31.0" diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 5fdb579723..d1c598622a 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -292,4 +292,8 @@ class ApplicationService(object): return self.rate_limited def __str__(self): - return "ApplicationService: %s" % (self.__dict__,) + # copy dictionary and redact token fields so they don't get logged + dict_copy = self.__dict__.copy() + dict_copy["token"] = "<redacted>" + dict_copy["hs_token"] = "<redacted>" + return "ApplicationService: %s" % (dict_copy,) diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index 8f6ed73328..e22c731aad 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -18,6 +18,9 @@ from ._base import Config DEFAULT_CONFIG = """\ # User Consent configuration # +# for detailed instructions, see +# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md +# # Parts of this section are required if enabling the 'consent' resource under # 'listeners', in particular 'template_dir' and 'version'. # diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 9f1142b5a9..1d5c0f3797 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure from synapse.metrics import LaterGauge -from blist import sorteddict +from sortedcontainers import SortedDict from collections import namedtuple import logging @@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = sorteddict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) + self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) - self.edus = sorteddict() # stream position -> Edu + self.edus = SortedDict() # stream position -> Edu - self.failures = sorteddict() # stream position -> (destination, Failure) + self.failures = SortedDict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() # stream position -> destination + self.device_messages = SortedDict() # stream position -> destination self.pos = 1 - self.pos_time = sorteddict() + self.pos_time = SortedDict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner @@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object): now = self.clock.time_msec() keys = self.pos_time.keys() - time = keys.bisect_left(now - FIVE_MINUTES_AGO) + time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO) if not keys[:time]: return @@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object): with Measure(self.clock, "send_queue._clear"): # Delete things out of presence maps keys = self.presence_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.presence_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.presence_changed[key] @@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object): # Delete things out of keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.keyed_edu_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.keyed_edu_changed[key] @@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object): # Delete things out of edu map keys = self.edus.keys() - i = keys.bisect_left(position_to_delete) + i = self.edus.bisect_left(position_to_delete) for key in keys[:i]: del self.edus[key] # Delete things out of failure map keys = self.failures.keys() - i = keys.bisect_left(position_to_delete) + i = self.failures.bisect_left(position_to_delete) for key in keys[:i]: del self.failures[key] # Delete things out of device map keys = self.device_messages.keys() - i = keys.bisect_left(position_to_delete) + i = self.device_messages.bisect_left(position_to_delete) for key in keys[:i]: del self.device_messages[key] @@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(federation_ack) # Fetch changed presence - keys = self.presence_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.presence_changed.bisect_right(from_token) + j = self.presence_changed.bisect_right(to_token) + 1 dest_user_ids = [ (pos, user_id) - for pos in keys[i:j] - for user_id in self.presence_changed[pos] + for pos, user_id_list in self.presence_changed.items()[i:j] + for user_id in user_id_list ] for (key, user_id) in dest_user_ids: @@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object): ))) # Fetch changes keyed edus - keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.keyed_edu_changed.bisect_right(from_token) + j = self.keyed_edu_changed.bisect_right(to_token) + 1 # We purposefully clobber based on the key here, python dict comprehensions # always use the last value, so this will correctly point to the last # stream position. - keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} + keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} for ((destination, edu_key), pos) in iteritems(keyed_edus): rows.append((pos, KeyedEduRow( @@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object): ))) # Fetch changed edus - keys = self.edus.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - edus = ((k, self.edus[k]) for k in keys[i:j]) + i = self.edus.bisect_right(from_token) + j = self.edus.bisect_right(to_token) + 1 + edus = self.edus.items()[i:j] for (pos, edu) in edus: rows.append((pos, EduRow(edu))) # Fetch changed failures - keys = self.failures.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - failures = ((k, self.failures[k]) for k in keys[i:j]) + i = self.failures.bisect_right(from_token) + j = self.failures.bisect_right(to_token) + 1 + failures = self.failures.items()[i:j] for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object): ))) # Fetch changed device messages - keys = self.device_messages.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - device_messages = {self.device_messages[k]: k for k in keys[i:j]} + i = self.device_messages.bisect_right(from_token) + j = self.device_messages.bisect_right(to_token) + 1 + device_messages = {v: k for k, v in self.device_messages.items()[i:j]} for (destination, pos) in iteritems(device_messages): rows.append((pos, DeviceRow( diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 054372e179..58ef8d3ce4 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -13,6 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import re + from twisted.internet.defer import CancelledError from twisted.python import failure @@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout): value.trap(CancelledError) raise RequestTimedOutError() return value + + +ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') + + +def redact_uri(uri): + """Strips access tokens from the uri replaces with <redacted>""" + return ACCESS_TOKEN_RE.sub( + br'\1<redacted>\3', + uri + ) diff --git a/synapse/http/client.py b/synapse/http/client.py index 4d4eee3d64..8064a84c5c 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -19,7 +19,7 @@ from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) -from synapse.http import cancelled_to_request_timed_out_error +from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable @@ -90,7 +90,8 @@ class SimpleHttpClient(object): # counters to it outgoing_requests_counter.labels(method).inc() - logger.info("Sending request %s %s", method, uri) + # log request but strip `access_token` (AS requests for example include this) + logger.info("Sending request %s %s", method, redact_uri(uri)) try: request_deferred = self.agent.request( @@ -105,14 +106,14 @@ class SimpleHttpClient(object): incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", - method, uri, response.code + method, redact_uri(uri), response.code ) defer.returnValue(response) except Exception as e: incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", - method, uri, type(e).__name__, e.message + method, redact_uri(uri), type(e).__name__, e.message ) raise e diff --git a/synapse/http/site.py b/synapse/http/site.py index 60299657b9..2664006f8c 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,18 +14,16 @@ import contextlib import logging -import re import time from twisted.web.server import Site, Request +from synapse.http import redact_uri from synapse.http.request_metrics import RequestMetrics from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) -ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') - _next_request_seq = 0 @@ -69,10 +67,7 @@ class SynapseRequest(Request): return "%s-%i" % (self.method, self.request_seq) def get_redacted_uri(self): - return ACCESS_TOKEN_RE.sub( - br'\1<redacted>\3', - self.uri - ) + return redact_uri(self.uri) def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 478c497722..faf6dfdb8d 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -50,14 +50,16 @@ REQUIREMENTS = { "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], - "blist": ["blist"], + "sortedcontainers": ["sortedcontainers"], "pysaml2>=3.0.0": ["saml2>=3.0.0"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], "six": ["six"], "prometheus_client": ["prometheus_client"], + "attr": ["attr"], } + CONDITIONAL_REQUIREMENTS = { "web_client": { "matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"], diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 6835a7bba2..b8665a45eb 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): yield self.store.find_first_stream_ordering_after_ts(ts) ) - room_event_after_stream_ordering = ( + r = ( yield self.store.get_room_event_after_stream_ordering( room_id, stream_ordering, ) ) - if room_event_after_stream_ordering: - token = yield self.store.get_topological_token_for_event( - room_event_after_stream_ordering, - ) - else: + if not r: logger.warn( "[purge] purging events not possible: No event found " "(received_ts %i => stream_ordering %i)", @@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): "there is no event to be purged", errcode=Codes.NOT_FOUND, ) + (stream, topo, _event_id) = r + token = "t%d-%d" % (topo, stream) logger.info( - "[purge] purging up to token %d (received_ts %i => " + "[purge] purging up to token %s (received_ts %i => " "stream_ordering %i)", token, ts, stream_ordering, ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c11bc52177..85b8ec2b8f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -272,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore): for typ in types: if typ[1] is None: where_clauses.append("(type = ?)") - where_args.extend(typ[0]) + where_args.append(typ[0]) wildcard_types = True else: where_clauses.append("(type = ? AND state_key = ?)") diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index a7fe0397fa..817118e30f 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR +from synapse.util import caches -from blist import sorteddict +from sortedcontainers import SortedDict import logging @@ -32,16 +32,18 @@ class StreamChangeCache(object): entities that may have changed since that position. If position key is too old then the cache will simply return all given entities. """ - def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): - self._max_size = int(max_size * CACHE_SIZE_FACTOR) + + def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None): + self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR) self._entity_to_key = {} - self._cache = sorteddict() + self._cache = SortedDict() self._earliest_known_stream_pos = current_stream_pos self.name = name - self.metrics = register_cache("cache", self.name, self._cache) + self.metrics = caches.register_cache("cache", self.name, self._cache) - for entity, stream_pos in prefilled_cache.items(): - self.entity_has_changed(entity, stream_pos) + if prefilled_cache: + for entity, stream_pos in prefilled_cache.items(): + self.entity_has_changed(entity, stream_pos) def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos @@ -65,22 +67,25 @@ class StreamChangeCache(object): return False def get_entities_changed(self, entities, stream_pos): - """Returns subset of entities that have had new things since the - given position. If the position is too old it will just return the given list. + """ + Returns subset of entities that have had new things since the given + position. Entities unknown to the cache will be returned. If the + position is too old it will just return the given list. """ assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) + not_known_entities = set(entities) - set(self._entity_to_key) - result = set( - self._cache[k] for k in keys[i:] - ).intersection(entities) + result = ( + set(self._cache.values()[self._cache.bisect_right(stream_pos) :]) + .intersection(entities) + .union(not_known_entities) + ) self.metrics.inc_hits() else: - result = entities + result = set(entities) self.metrics.inc_misses() return result @@ -90,12 +95,13 @@ class StreamChangeCache(object): """ assert type(stream_pos) is int + if not self._cache: + # If we have no cache, nothing can have changed. + return False + if stream_pos >= self._earliest_known_stream_pos: self.metrics.inc_hits() - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) - - return i < len(keys) + return self._cache.bisect_right(stream_pos) < len(self._cache) else: self.metrics.inc_misses() return True @@ -107,10 +113,7 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) - - return [self._cache[k] for k in keys[i:]] + return self._cache.values()[self._cache.bisect_right(stream_pos) :] else: return None @@ -129,8 +132,10 @@ class StreamChangeCache(object): self._entity_to_key[entity] = stream_pos while len(self._cache) > self._max_size: - k, r = self._cache.popitem() - self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + k, r = self._cache.popitem(0) + self._earliest_known_stream_pos = max( + k, self._earliest_known_stream_pos, + ) self._entity_to_key.pop(r, None) def get_max_pos_of_last_change(self, entity): diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py new file mode 100644 index 0000000000..67ece166c7 --- /dev/null +++ b/tests/util/test_stream_change_cache.py @@ -0,0 +1,198 @@ +from tests import unittest +from mock import patch + +from synapse.util.caches.stream_change_cache import StreamChangeCache + + +class StreamChangeCacheTests(unittest.TestCase): + """ + Tests for StreamChangeCache. + """ + + def test_prefilled_cache(self): + """ + Providing a prefilled cache to StreamChangeCache will result in a cache + with the prefilled-cache entered in. + """ + cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2}) + self.assertTrue(cache.has_entity_changed("user@foo.com", 1)) + + def test_has_entity_changed(self): + """ + StreamChangeCache.entity_has_changed will mark entities as changed, and + has_entity_changed will observe the changed entities. + """ + cache = StreamChangeCache("#test", 3) + + cache.entity_has_changed("user@foo.com", 6) + cache.entity_has_changed("bar@baz.net", 7) + + # If it's been changed after that stream position, return True + self.assertTrue(cache.has_entity_changed("user@foo.com", 4)) + self.assertTrue(cache.has_entity_changed("bar@baz.net", 4)) + + # If it's been changed at that stream position, return False + self.assertFalse(cache.has_entity_changed("user@foo.com", 6)) + + # If there's no changes after that stream position, return False + self.assertFalse(cache.has_entity_changed("user@foo.com", 7)) + + # If the entity does not exist, return False. + self.assertFalse(cache.has_entity_changed("not@here.website", 7)) + + # If we request before the stream cache's earliest known position, + # return True, whether it's a known entity or not. + self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) + self.assertTrue(cache.has_entity_changed("not@here.website", 0)) + + @patch("synapse.util.caches.CACHE_SIZE_FACTOR", 1.0) + def test_has_entity_changed_pops_off_start(self): + """ + StreamChangeCache.entity_has_changed will respect the max size and + purge the oldest items upon reaching that max size. + """ + cache = StreamChangeCache("#test", 1, max_size=2) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + # The cache is at the max size, 2 + self.assertEqual(len(cache._cache), 2) + + # The oldest item has been popped off + self.assertTrue("user@foo.com" not in cache._entity_to_key) + + # If we update an existing entity, it keeps the two existing entities + cache.entity_has_changed("bar@baz.net", 5) + self.assertEqual( + set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key) + ) + + def test_get_all_entities_changed(self): + """ + StreamChangeCache.get_all_entities_changed will return all changed + entities since the given position. If the position is before the start + of the known stream, it returns None instead. + """ + cache = StreamChangeCache("#test", 1) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + self.assertEqual( + cache.get_all_entities_changed(1), + ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], + ) + self.assertEqual( + cache.get_all_entities_changed(2), ["bar@baz.net", "user@elsewhere.org"] + ) + self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"]) + self.assertEqual(cache.get_all_entities_changed(0), None) + + def test_has_any_entity_changed(self): + """ + StreamChangeCache.has_any_entity_changed will return True if any + entities have been changed since the provided stream position, and + False if they have not. If the cache has entries and the provided + stream position is before it, it will return True, otherwise False if + the cache has no entries. + """ + cache = StreamChangeCache("#test", 1) + + # With no entities, it returns False for the past, present, and future. + self.assertFalse(cache.has_any_entity_changed(0)) + self.assertFalse(cache.has_any_entity_changed(1)) + self.assertFalse(cache.has_any_entity_changed(2)) + + # We add an entity + cache.entity_has_changed("user@foo.com", 2) + + # With an entity, it returns True for the past, the stream start + # position, and False for the stream position the entity was changed + # on and ones after it. + self.assertTrue(cache.has_any_entity_changed(0)) + self.assertTrue(cache.has_any_entity_changed(1)) + self.assertFalse(cache.has_any_entity_changed(2)) + self.assertFalse(cache.has_any_entity_changed(3)) + + def test_get_entities_changed(self): + """ + StreamChangeCache.get_entities_changed will return the entities in the + given list that have changed since the provided stream ID. If the + stream position is earlier than the earliest known position, it will + return all of the entities queried for. + """ + cache = StreamChangeCache("#test", 1) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + # Query all the entries, but mid-way through the stream. We should only + # get the ones after that point. + self.assertEqual( + cache.get_entities_changed( + ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2 + ), + set(["bar@baz.net", "user@elsewhere.org"]), + ) + + # Query all the entries mid-way through the stream, but include one + # that doesn't exist in it. We should get back the one that doesn't + # exist, too. + self.assertEqual( + cache.get_entities_changed( + [ + "user@foo.com", + "bar@baz.net", + "user@elsewhere.org", + "not@here.website", + ], + stream_pos=2, + ), + set(["bar@baz.net", "user@elsewhere.org", "not@here.website"]), + ) + + # Query all the entries, but before the first known point. We will get + # all the entries we queried for, including ones that don't exist. + self.assertEqual( + cache.get_entities_changed( + [ + "user@foo.com", + "bar@baz.net", + "user@elsewhere.org", + "not@here.website", + ], + stream_pos=0, + ), + set( + [ + "user@foo.com", + "bar@baz.net", + "user@elsewhere.org", + "not@here.website", + ] + ), + ) + + def test_max_pos(self): + """ + StreamChangeCache.get_max_pos_of_last_change will return the most + recent point where the entity could have changed. If the entity is not + known, the stream start is provided instead. + """ + cache = StreamChangeCache("#test", 1) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + # Known entities will return the point where they were changed. + self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 2) + self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3) + self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4) + + # Unknown entities will return the stream start position. + self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1) diff --git a/tox.ini b/tox.ini index 99b35f399a..5d79098d2f 100644 --- a/tox.ini +++ b/tox.ini @@ -52,33 +52,41 @@ commands = /usr/bin/find "{toxinidir}" -name '*.pyc' -delete coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \ "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \ - tests/appservice/test_scheduler.py \ + tests/api/test_filtering.py \ + tests/api/test_ratelimiting.py \ + tests/appservice \ + tests/crypto \ + tests/events \ + tests/handlers/test_appservice.py \ tests/handlers/test_auth.py \ + tests/handlers/test_device.py \ + tests/handlers/test_directory.py \ + tests/handlers/test_e2e_keys.py \ tests/handlers/test_presence.py \ + tests/handlers/test_profile.py \ tests/handlers/test_register.py \ + tests/replication/slave/storage/test_account_data.py \ + tests/replication/slave/storage/test_receipts.py \ tests/storage/test_appservice.py \ + tests/storage/test_background_update.py \ tests/storage/test_base.py \ + tests/storage/test__base.py \ tests/storage/test_client_ips.py \ tests/storage/test_devices.py \ tests/storage/test_end_to_end_keys.py \ tests/storage/test_event_push_actions.py \ + tests/storage/test_keys.py \ + tests/storage/test_presence.py \ tests/storage/test_profile.py \ + tests/storage/test_registration.py \ tests/storage/test_room.py \ + tests/storage/test_user_directory.py \ tests/test_distributor.py \ tests/test_dns.py \ tests/test_preview.py \ tests/test_test_utils.py \ tests/test_types.py \ - tests/util/test_dict_cache.py \ - tests/util/test_expiring_cache.py \ - tests/util/test_file_consumer.py \ - tests/util/test_limiter.py \ - tests/util/test_linearizer.py \ - tests/util/test_logcontext.py \ - tests/util/test_logformatter.py \ - tests/util/test_rwlock.py \ - tests/util/test_snapshot_cache.py \ - tests/util/test_wheel_timer.py} \ + tests/util} \ {env:TOXSUFFIX:} {env:DUMP_COVERAGE_COMMAND:coverage report -m} |