diff --git a/.gitignore b/.gitignore
index 7940158c5b..9f42a7568f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,6 +43,7 @@ media_store/
build/
venv/
+venv*/
localhost-800*/
static/client/register/register_config.js
diff --git a/CHANGES.rst b/CHANGES.rst
index 0569b581db..4047f50aa5 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,72 @@
+Changes in synapse v0.31.1 (2018-06-08)
+=======================================
+
+v0.31.1 fixes a security bug in the ``get_missing_events`` federation API
+where event visibility rules were not applied correctly.
+
+We are not aware of it being actively exploited but please upgrade asap.
+
+Bug Fixes:
+
+* Fix event filtering in get_missing_events handler (PR #3371)
+
+Changes in synapse v0.31.0 (2018-06-06)
+=======================================
+
+Most notable change from v0.30.0 is to switch to the python prometheus library to improve system
+stats reporting. WARNING: this changes a number of prometheus metrics in a
+backwards-incompatible manner. For more details, see
+`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.
+
+Bug Fixes:
+
+* Fix metric documentation tables (PR #3341)
+* Fix LaterGauge error handling (694968f)
+* Fix replication metrics (b7e7fd2)
+
+Changes in synapse v0.31.0-rc1 (2018-06-04)
+==========================================
+
+Features:
+
+* Switch to the Python Prometheus library (PR #3256, #3274)
+* Let users leave the server notice room after joining (PR #3287)
+
+
+Changes:
+
+* daily user type phone home stats (PR #3264)
+* Use iter* methods for _filter_events_for_server (PR #3267)
+* Docs on consent bits (PR #3268)
+* Remove users from user directory on deactivate (PR #3277)
+* Avoid sending consent notice to guest users (PR #3288)
+* disable CPUMetrics if no /proc/self/stat (PR #3299)
+* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
+* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
+* Reduce stuck read-receipts: ignore depth when updating (PR #3318)
+* Put python's logs into Trial when running unit tests (PR #3319)
+
+Changes, python 3 migration:
+
+* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile!
+* replace some iteritems with six (PR #3244) Thanks to @NotAFile!
+* Add batch_iter to utils (PR #3245) Thanks to @NotAFile!
+* use repr, not str (PR #3246) Thanks to @NotAFile!
+* Misc Python3 fixes (PR #3247) Thanks to @NotAFile!
+* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile!
+* more six iteritems (PR #3279) Thanks to @NotAFile!
+* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile!
+* remaining isintance fixes (PR #3281) Thanks to @NotAFile!
+* py3-ize state.py (PR #3283) Thanks to @NotAFile!
+* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel!
+* use memoryview in py3 (PR #3303) Thanks to @NotAFile!
+
+Bugs:
+
+* Fix federation backfill bugs (PR #3261)
+* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
+
+
Changes in synapse v0.30.0 (2018-05-24)
==========================================
diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst
index 25e06bca58..5bbb5a4f3a 100644
--- a/docs/metrics-howto.rst
+++ b/docs/metrics-howto.rst
@@ -63,30 +63,40 @@ The duplicated metrics deprecated in Synapse 0.27.0 have been removed.
All time duration-based metrics have been changed to be seconds. This affects:
-================================
-msec -> sec metrics
-================================
-python_gc_time
-python_twisted_reactor_tick_time
-synapse_storage_query_time
-synapse_storage_schedule_time
-synapse_storage_transaction_time
-================================
++----------------------------------+
+| msec -> sec metrics |
++==================================+
+| python_gc_time |
++----------------------------------+
+| python_twisted_reactor_tick_time |
++----------------------------------+
+| synapse_storage_query_time |
++----------------------------------+
+| synapse_storage_schedule_time |
++----------------------------------+
+| synapse_storage_transaction_time |
++----------------------------------+
Several metrics have been changed to be histograms, which sort entries into
buckets and allow better analysis. The following metrics are now histograms:
-=========================================
-Altered metrics
-=========================================
-python_gc_time
-python_twisted_reactor_pending_calls
-python_twisted_reactor_tick_time
-synapse_http_server_response_time_seconds
-synapse_storage_query_time
-synapse_storage_schedule_time
-synapse_storage_transaction_time
-=========================================
++-------------------------------------------+
+| Altered metrics |
++===========================================+
+| python_gc_time |
++-------------------------------------------+
+| python_twisted_reactor_pending_calls |
++-------------------------------------------+
+| python_twisted_reactor_tick_time |
++-------------------------------------------+
+| synapse_http_server_response_time_seconds |
++-------------------------------------------+
+| synapse_storage_query_time |
++-------------------------------------------+
+| synapse_storage_schedule_time |
++-------------------------------------------+
+| synapse_storage_transaction_time |
++-------------------------------------------+
Block and response metrics renamed for 0.27.0
diff --git a/docs/postgres.rst b/docs/postgres.rst
index 296293e859..2377542296 100644
--- a/docs/postgres.rst
+++ b/docs/postgres.rst
@@ -9,19 +9,19 @@ Set up database
Assuming your PostgreSQL database user is called ``postgres``, create a user
``synapse_user`` with::
- su - postgres
- createuser --pwprompt synapse_user
+ su - postgres
+ createuser --pwprompt synapse_user
The PostgreSQL database used *must* have the correct encoding set, otherwise it
would not be able to store UTF8 strings. To create a database with the correct
encoding use, e.g.::
- CREATE DATABASE synapse
- ENCODING 'UTF8'
- LC_COLLATE='C'
- LC_CTYPE='C'
- template=template0
- OWNER synapse_user;
+ CREATE DATABASE synapse
+ ENCODING 'UTF8'
+ LC_COLLATE='C'
+ LC_CTYPE='C'
+ template=template0
+ OWNER synapse_user;
This would create an appropriate database named ``synapse`` owned by the
``synapse_user`` user (which must already exist).
@@ -126,7 +126,7 @@ run::
--postgres-config homeserver-postgres.yaml
Once that has completed, change the synapse config to point at the PostgreSQL
-database configuration file ``homeserver-postgres.yaml``:
+database configuration file ``homeserver-postgres.yaml``::
./synctl stop
mv homeserver.yaml homeserver-old-sqlite.yaml
diff --git a/setup.cfg b/setup.cfg
index da8eafbb39..fa6f2d1ce4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -17,4 +17,5 @@ ignore =
[flake8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
-ignore = W503
+# E203 is contrary to PEP8.
+ignore = W503,E203
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 5bada5e290..78fc63aa49 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.30.0"
+__version__ = "0.31.1"
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 712dfa870e..56ae086128 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -171,6 +171,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+ cache_factors = config.get("synctl_cache_factors", {})
+ for cache_name, factor in cache_factors.iteritems():
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
worker_configfiles = []
if options.worker:
start_stop_synapse = False
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 5fdb579723..d1c598622a 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -292,4 +292,8 @@ class ApplicationService(object):
return self.rate_limited
def __str__(self):
- return "ApplicationService: %s" % (self.__dict__,)
+ # copy dictionary and redact token fields so they don't get logged
+ dict_copy = self.__dict__.copy()
+ dict_copy["token"] = "<redacted>"
+ dict_copy["hs_token"] = "<redacted>"
+ return "ApplicationService: %s" % (dict_copy,)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 00efff1464..47251fb6ad 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID
import logging
import urllib
+from prometheus_client import Counter
+
logger = logging.getLogger(__name__)
+sent_transactions_counter = Counter(
+ "synapse_appservice_api_sent_transactions",
+ "Number of /transactions/ requests sent",
+ ["service"]
+)
+
+failed_transactions_counter = Counter(
+ "synapse_appservice_api_failed_transactions",
+ "Number of /transactions/ requests that failed to send",
+ ["service"]
+)
+
+sent_events_counter = Counter(
+ "synapse_appservice_api_sent_events",
+ "Number of events sent to the AS",
+ ["service"]
+)
HOUR_IN_MS = 60 * 60 * 1000
@@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
+ sent_transactions_counter.labels(service.id).inc()
+ sent_events_counter.labels(service.id).inc(len(events))
defer.returnValue(True)
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
+ failed_transactions_counter.labels(service.id).inc()
defer.returnValue(False)
def _serialize(self, events):
diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py
index 8f6ed73328..e22c731aad 100644
--- a/synapse/config/consent_config.py
+++ b/synapse/config/consent_config.py
@@ -18,6 +18,9 @@ from ._base import Config
DEFAULT_CONFIG = """\
# User Consent configuration
#
+# for detailed instructions, see
+# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md
+#
# Parts of this section are required if enabling the 'consent' resource under
# 'listeners', in particular 'template_dir' and 'version'.
#
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 22ee0fc93f..9b17ef0a08 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
from twisted.internet import defer
from signedjson.sign import (
- verify_signed_json, signature_ids, sign_json, encode_canonical_json
+ verify_signed_json, signature_ids, sign_json, encode_canonical_json,
+ SignatureVerifyException,
)
from signedjson.key import (
- is_signing_algorithm_supported, decode_verify_key_bytes
+ is_signing_algorithm_supported, decode_verify_key_bytes,
+ encode_verify_key_base64,
)
from unpaddedbase64 import decode_base64, encode_base64
@@ -56,7 +58,7 @@ Attributes:
key_ids(set(str)): The set of key_ids to that could be used to verify the
JSON object
json_object(dict): The JSON object to verify.
- deferred(twisted.internet.defer.Deferred):
+ deferred(Deferred[str, str, nacl.signing.VerifyKey]):
A deferred (server_name, key_id, verify_key) tuple that resolves when
a verify key has been fetched. The deferreds' callbacks are run with no
logcontext.
@@ -736,6 +738,17 @@ class Keyring(object):
@defer.inlineCallbacks
def _handle_key_deferred(verify_request):
+ """Waits for the key to become available, and then performs a verification
+
+ Args:
+ verify_request (VerifyKeyRequest):
+
+ Returns:
+ Deferred[None]
+
+ Raises:
+ SynapseError if there was a problem performing the verification
+ """
server_name = verify_request.server_name
try:
with PreserveLoggingContext():
@@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
))
try:
verify_signed_json(json_object, server_name, verify_key)
- except Exception:
+ except SignatureVerifyException as e:
+ logger.debug(
+ "Error verifying signature for %s:%s:%s with key %s: %s",
+ server_name, verify_key.alg, verify_key.version,
+ encode_verify_key_base64(verify_key),
+ str(e),
+ )
raise SynapseError(
401,
- "Invalid signature for server %s with key %s:%s" % (
- server_name, verify_key.alg, verify_key.version
+ "Invalid signature for server %s with key %s:%s: %s" % (
+ server_name, verify_key.alg, verify_key.version, str(e),
),
Codes.UNAUTHORIZED,
)
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 3dcc629d44..1d5c0f3797 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
from synapse.metrics import LaterGauge
-from blist import sorteddict
+from sortedcontainers import SortedDict
from collections import namedtuple
import logging
@@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
- self.presence_changed = sorteddict() # Stream position -> user_id
+ self.presence_changed = SortedDict() # Stream position -> user_id
self.keyed_edu = {} # (destination, key) -> EDU
- self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
+ self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
- self.edus = sorteddict() # stream position -> Edu
+ self.edus = SortedDict() # stream position -> Edu
- self.failures = sorteddict() # stream position -> (destination, Failure)
+ self.failures = SortedDict() # stream position -> (destination, Failure)
- self.device_messages = sorteddict() # stream position -> destination
+ self.device_messages = SortedDict() # stream position -> destination
self.pos = 1
- self.pos_time = sorteddict()
+ self.pos_time = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
@@ -75,7 +75,7 @@ class FederationRemoteSendQueue(object):
# changes. ARGH.
def register(name, queue):
LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
- "", lambda: len(queue))
+ "", [], lambda: len(queue))
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
@@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object):
now = self.clock.time_msec()
keys = self.pos_time.keys()
- time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+ time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
@@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object):
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
@@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object):
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.keyed_edu_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
@@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object):
# Delete things out of edu map
keys = self.edus.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.edus.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
# Delete things out of failure map
keys = self.failures.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.failures.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
# Delete things out of device map
keys = self.device_messages.keys()
- i = keys.bisect_left(position_to_delete)
+ i = self.device_messages.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]
@@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object):
self._clear_queue_before_pos(federation_ack)
# Fetch changed presence
- keys = self.presence_changed.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
+ i = self.presence_changed.bisect_right(from_token)
+ j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
- for pos in keys[i:j]
- for user_id in self.presence_changed[pos]
+ for pos, user_id_list in self.presence_changed.items()[i:j]
+ for user_id in user_id_list
]
for (key, user_id) in dest_user_ids:
@@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changes keyed edus
- keys = self.keyed_edu_changed.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
+ i = self.keyed_edu_changed.bisect_right(from_token)
+ j = self.keyed_edu_changed.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
- keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
+ keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
@@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed edus
- keys = self.edus.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
- edus = ((k, self.edus[k]) for k in keys[i:j])
+ i = self.edus.bisect_right(from_token)
+ j = self.edus.bisect_right(to_token) + 1
+ edus = self.edus.items()[i:j]
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Fetch changed failures
- keys = self.failures.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
- failures = ((k, self.failures[k]) for k in keys[i:j])
+ i = self.failures.bisect_right(from_token)
+ j = self.failures.bisect_right(to_token) + 1
+ failures = self.failures.items()[i:j]
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
@@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed device messages
- keys = self.device_messages.keys()
- i = keys.bisect_right(from_token)
- j = keys.bisect_right(to_token) + 1
- device_messages = {self.device_messages[k]: k for k in keys[i:j]}
+ i = self.device_messages.bisect_right(from_token)
+ j = self.device_messages.bisect_right(to_token) + 1
+ device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index fcf94befb7..495ac4c648 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1794,6 +1794,10 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
+ missing_events = yield self._filter_events_for_server(
+ origin, room_id, missing_events,
+ )
+
defer.returnValue(missing_events)
@defer.inlineCallbacks
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 054372e179..58ef8d3ce4 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -13,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import re
+
from twisted.internet.defer import CancelledError
from twisted.python import failure
@@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
value.trap(CancelledError)
raise RequestTimedOutError()
return value
+
+
+ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+
+
+def redact_uri(uri):
+ """Strips access tokens from the uri replaces with <redacted>"""
+ return ACCESS_TOKEN_RE.sub(
+ br'\1<redacted>\3',
+ uri
+ )
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4d4eee3d64..8064a84c5c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -19,7 +19,7 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
-from synapse.http import cancelled_to_request_timed_out_error
+from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@@ -90,7 +90,8 @@ class SimpleHttpClient(object):
# counters to it
outgoing_requests_counter.labels(method).inc()
- logger.info("Sending request %s %s", method, uri)
+ # log request but strip `access_token` (AS requests for example include this)
+ logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = self.agent.request(
@@ -105,14 +106,14 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
- method, uri, response.code
+ method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
- method, uri, type(e).__name__, e.message
+ method, redact_uri(uri), type(e).__name__, e.message
)
raise e
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 60299657b9..2664006f8c 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,18 +14,16 @@
import contextlib
import logging
-import re
import time
from twisted.web.server import Site, Request
+from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
-ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
-
_next_request_seq = 0
@@ -69,10 +67,7 @@ class SynapseRequest(Request):
return "%s-%i" % (self.method, self.request_seq)
def get_redacted_uri(self):
- return ACCESS_TOKEN_RE.sub(
- br'\1<redacted>\3',
- self.uri
- )
+ return redact_uri(self.uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 56c0032f91..429e79c472 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -60,10 +60,13 @@ class LaterGauge(object):
try:
calls = self.caller()
- except Exception as e:
- print(e)
- logger.err()
+ except Exception:
+ logger.exception(
+ "Exception running callback for LaterGuage(%s)",
+ self.name,
+ )
yield g
+ return
if isinstance(calls, dict):
for k, v in calls.items():
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 478c497722..faf6dfdb8d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -50,14 +50,16 @@ REQUIREMENTS = {
"bcrypt": ["bcrypt>=3.1.0"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
- "blist": ["blist"],
+ "sortedcontainers": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
+ "attr": ["attr"],
}
+
CONDITIONAL_REQUIREMENTS = {
"web_client": {
"matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index a6280aae70..c870475cd1 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -622,7 +622,7 @@ tcp_inbound_commands = LaterGauge(
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
- for k, count in iteritems(p.inbound_commands_counter.counts)
+ for k, count in iteritems(p.inbound_commands_counter)
})
tcp_outbound_commands = LaterGauge(
@@ -630,7 +630,7 @@ tcp_outbound_commands = LaterGauge(
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
- for k, count in iteritems(p.outbound_commands_counter.counts)
+ for k, count in iteritems(p.outbound_commands_counter)
})
# number of updates received for each RDATA stream
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 6835a7bba2..b8665a45eb 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
yield self.store.find_first_stream_ordering_after_ts(ts)
)
- room_event_after_stream_ordering = (
+ r = (
yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
)
- if room_event_after_stream_ordering:
- token = yield self.store.get_topological_token_for_event(
- room_event_after_stream_ordering,
- )
- else:
+ if not r:
logger.warn(
"[purge] purging events not possible: No event found "
"(received_ts %i => stream_ordering %i)",
@@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
"there is no event to be purged",
errcode=Codes.NOT_FOUND,
)
+ (stream, topo, _event_id) = r
+ token = "t%d-%d" % (topo, stream)
logger.info(
- "[purge] purging up to token %d (received_ts %i => "
+ "[purge] purging up to token %s (received_ts %i => "
"stream_ordering %i)",
token, ts, stream_ordering,
)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7bfc3d91b5..48a88f755e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
- txn.call_after(self.was_forgotten_at.invalidate_all)
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self._invalidate_cache_and_stream(
txn, self.who_forgot_in_room, (room_id,)
@@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)
- @cachedInlineCallbacks(num_args=3)
- def was_forgotten_at(self, user_id, room_id, event_id):
- """Returns whether user_id has elected to discard history for room_id at
- event_id.
-
- event_id must be a membership event."""
- def f(txn):
- sql = (
- "SELECT"
- " forgotten"
- " FROM"
- " room_memberships"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- " AND"
- " event_id = ?"
- )
- txn.execute(sql, (user_id, room_id, event_id))
- rows = txn.fetchall()
- return rows[0][0]
- forgot = yield self.runInteraction("did_forget_membership_at", f)
- defer.returnValue(forgot == 1)
-
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 3b87d981b5..b452813fbb 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
-from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
+from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
- "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
+ "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
@cached(max_entries=100000, iterable=True)
@@ -305,7 +305,7 @@ class StateGroupWorkerStore(SQLBaseStore):
for typ in types:
if typ[1] is None:
where_clauses.append("(type = ?)")
- where_args.extend(typ[0])
+ where_args.append(typ[0])
wildcard_types = True
else:
where_clauses.append("(type = ? AND state_key = ?)")
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 183faf75a1..900575eb3c 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -22,6 +22,16 @@ import six
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
+
+def get_cache_factor_for(cache_name):
+ env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
+ factor = os.environ.get(env_var)
+ if factor:
+ return float(factor)
+
+ return CACHE_SIZE_FACTOR
+
+
caches_by_name = {}
collectors_by_name = {}
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index fc1874b65b..65a1042de1 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
-from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -313,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
- max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+ max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index a7fe0397fa..817118e30f 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util import caches
-from blist import sorteddict
+from sortedcontainers import SortedDict
import logging
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities.
"""
- def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
- self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+ def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+ self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {}
- self._cache = sorteddict()
+ self._cache = SortedDict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
- self.metrics = register_cache("cache", self.name, self._cache)
+ self.metrics = caches.register_cache("cache", self.name, self._cache)
- for entity, stream_pos in prefilled_cache.items():
- self.entity_has_changed(entity, stream_pos)
+ if prefilled_cache:
+ for entity, stream_pos in prefilled_cache.items():
+ self.entity_has_changed(entity, stream_pos)
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
return False
def get_entities_changed(self, entities, stream_pos):
- """Returns subset of entities that have had new things since the
- given position. If the position is too old it will just return the given list.
+ """
+ Returns subset of entities that have had new things since the given
+ position. Entities unknown to the cache will be returned. If the
+ position is too old it will just return the given list.
"""
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
+ not_known_entities = set(entities) - set(self._entity_to_key)
- result = set(
- self._cache[k] for k in keys[i:]
- ).intersection(entities)
+ result = (
+ set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+ .intersection(entities)
+ .union(not_known_entities)
+ )
self.metrics.inc_hits()
else:
- result = entities
+ result = set(entities)
self.metrics.inc_misses()
return result
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
"""
assert type(stream_pos) is int
+ if not self._cache:
+ # If we have no cache, nothing can have changed.
+ return False
+
if stream_pos >= self._earliest_known_stream_pos:
self.metrics.inc_hits()
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
-
- return i < len(keys)
+ return self._cache.bisect_right(stream_pos) < len(self._cache)
else:
self.metrics.inc_misses()
return True
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
-
- return [self._cache[k] for k in keys[i:]]
+ return self._cache.values()[self._cache.bisect_right(stream_pos) :]
else:
return None
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
self._entity_to_key[entity] = stream_pos
while len(self._cache) > self._max_size:
- k, r = self._cache.popitem()
- self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+ k, r = self._cache.popitem(0)
+ self._earliest_known_stream_pos = max(
+ k, self._earliest_known_stream_pos,
+ )
self._entity_to_key.pop(r, None)
def get_max_pos_of_last_change(self, entity):
diff --git a/tests/unittest.py b/tests/unittest.py
index 7b478c4294..184fe880f3 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -12,23 +12,37 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import logging
+
import twisted
+import twisted.logger
from twisted.trial import unittest
-import logging
+from synapse.util.logcontext import LoggingContextFilter
+
+# Set up putting Synapse's logs into Trial's.
+rootLogger = logging.getLogger()
+
+log_format = (
+ "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s"
+)
+
+
+class ToTwistedHandler(logging.Handler):
+ tx_log = twisted.logger.Logger()
+
+ def emit(self, record):
+ log_entry = self.format(record)
+ log_level = record.levelname.lower().replace('warning', 'warn')
+ self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry)
-# logging doesn't have a "don't log anything at all EVARRRR setting,
-# but since the highest value is 50, 1000000 should do ;)
-NEVER = 1000000
-handler = logging.StreamHandler()
-handler.setFormatter(logging.Formatter(
- "%(levelname)s:%(name)s:%(message)s [%(pathname)s:%(lineno)d]"
-))
-logging.getLogger().addHandler(handler)
-logging.getLogger().setLevel(NEVER)
-logging.getLogger("synapse.storage.SQL").setLevel(NEVER)
-logging.getLogger("synapse.storage.txn").setLevel(NEVER)
+handler = ToTwistedHandler()
+formatter = logging.Formatter(log_format)
+handler.setFormatter(formatter)
+handler.addFilter(LoggingContextFilter(request=""))
+rootLogger.addHandler(handler)
def around(target):
@@ -61,7 +75,7 @@ class TestCase(unittest.TestCase):
method = getattr(self, methodName)
- level = getattr(method, "loglevel", getattr(self, "loglevel", NEVER))
+ level = getattr(method, "loglevel", getattr(self, "loglevel", logging.ERROR))
@around(self)
def setUp(orig):
diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py
new file mode 100644
index 0000000000..67ece166c7
--- /dev/null
+++ b/tests/util/test_stream_change_cache.py
@@ -0,0 +1,198 @@
+from tests import unittest
+from mock import patch
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class StreamChangeCacheTests(unittest.TestCase):
+ """
+ Tests for StreamChangeCache.
+ """
+
+ def test_prefilled_cache(self):
+ """
+ Providing a prefilled cache to StreamChangeCache will result in a cache
+ with the prefilled-cache entered in.
+ """
+ cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
+ self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
+
+ def test_has_entity_changed(self):
+ """
+ StreamChangeCache.entity_has_changed will mark entities as changed, and
+ has_entity_changed will observe the changed entities.
+ """
+ cache = StreamChangeCache("#test", 3)
+
+ cache.entity_has_changed("user@foo.com", 6)
+ cache.entity_has_changed("bar@baz.net", 7)
+
+ # If it's been changed after that stream position, return True
+ self.assertTrue(cache.has_entity_changed("user@foo.com", 4))
+ self.assertTrue(cache.has_entity_changed("bar@baz.net", 4))
+
+ # If it's been changed at that stream position, return False
+ self.assertFalse(cache.has_entity_changed("user@foo.com", 6))
+
+ # If there's no changes after that stream position, return False
+ self.assertFalse(cache.has_entity_changed("user@foo.com", 7))
+
+ # If the entity does not exist, return False.
+ self.assertFalse(cache.has_entity_changed("not@here.website", 7))
+
+ # If we request before the stream cache's earliest known position,
+ # return True, whether it's a known entity or not.
+ self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
+ self.assertTrue(cache.has_entity_changed("not@here.website", 0))
+
+ @patch("synapse.util.caches.CACHE_SIZE_FACTOR", 1.0)
+ def test_has_entity_changed_pops_off_start(self):
+ """
+ StreamChangeCache.entity_has_changed will respect the max size and
+ purge the oldest items upon reaching that max size.
+ """
+ cache = StreamChangeCache("#test", 1, max_size=2)
+
+ cache.entity_has_changed("user@foo.com", 2)
+ cache.entity_has_changed("bar@baz.net", 3)
+ cache.entity_has_changed("user@elsewhere.org", 4)
+
+ # The cache is at the max size, 2
+ self.assertEqual(len(cache._cache), 2)
+
+ # The oldest item has been popped off
+ self.assertTrue("user@foo.com" not in cache._entity_to_key)
+
+ # If we update an existing entity, it keeps the two existing entities
+ cache.entity_has_changed("bar@baz.net", 5)
+ self.assertEqual(
+ set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key)
+ )
+
+ def test_get_all_entities_changed(self):
+ """
+ StreamChangeCache.get_all_entities_changed will return all changed
+ entities since the given position. If the position is before the start
+ of the known stream, it returns None instead.
+ """
+ cache = StreamChangeCache("#test", 1)
+
+ cache.entity_has_changed("user@foo.com", 2)
+ cache.entity_has_changed("bar@baz.net", 3)
+ cache.entity_has_changed("user@elsewhere.org", 4)
+
+ self.assertEqual(
+ cache.get_all_entities_changed(1),
+ ["user@foo.com", "bar@baz.net", "user@elsewhere.org"],
+ )
+ self.assertEqual(
+ cache.get_all_entities_changed(2), ["bar@baz.net", "user@elsewhere.org"]
+ )
+ self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"])
+ self.assertEqual(cache.get_all_entities_changed(0), None)
+
+ def test_has_any_entity_changed(self):
+ """
+ StreamChangeCache.has_any_entity_changed will return True if any
+ entities have been changed since the provided stream position, and
+ False if they have not. If the cache has entries and the provided
+ stream position is before it, it will return True, otherwise False if
+ the cache has no entries.
+ """
+ cache = StreamChangeCache("#test", 1)
+
+ # With no entities, it returns False for the past, present, and future.
+ self.assertFalse(cache.has_any_entity_changed(0))
+ self.assertFalse(cache.has_any_entity_changed(1))
+ self.assertFalse(cache.has_any_entity_changed(2))
+
+ # We add an entity
+ cache.entity_has_changed("user@foo.com", 2)
+
+ # With an entity, it returns True for the past, the stream start
+ # position, and False for the stream position the entity was changed
+ # on and ones after it.
+ self.assertTrue(cache.has_any_entity_changed(0))
+ self.assertTrue(cache.has_any_entity_changed(1))
+ self.assertFalse(cache.has_any_entity_changed(2))
+ self.assertFalse(cache.has_any_entity_changed(3))
+
+ def test_get_entities_changed(self):
+ """
+ StreamChangeCache.get_entities_changed will return the entities in the
+ given list that have changed since the provided stream ID. If the
+ stream position is earlier than the earliest known position, it will
+ return all of the entities queried for.
+ """
+ cache = StreamChangeCache("#test", 1)
+
+ cache.entity_has_changed("user@foo.com", 2)
+ cache.entity_has_changed("bar@baz.net", 3)
+ cache.entity_has_changed("user@elsewhere.org", 4)
+
+ # Query all the entries, but mid-way through the stream. We should only
+ # get the ones after that point.
+ self.assertEqual(
+ cache.get_entities_changed(
+ ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2
+ ),
+ set(["bar@baz.net", "user@elsewhere.org"]),
+ )
+
+ # Query all the entries mid-way through the stream, but include one
+ # that doesn't exist in it. We should get back the one that doesn't
+ # exist, too.
+ self.assertEqual(
+ cache.get_entities_changed(
+ [
+ "user@foo.com",
+ "bar@baz.net",
+ "user@elsewhere.org",
+ "not@here.website",
+ ],
+ stream_pos=2,
+ ),
+ set(["bar@baz.net", "user@elsewhere.org", "not@here.website"]),
+ )
+
+ # Query all the entries, but before the first known point. We will get
+ # all the entries we queried for, including ones that don't exist.
+ self.assertEqual(
+ cache.get_entities_changed(
+ [
+ "user@foo.com",
+ "bar@baz.net",
+ "user@elsewhere.org",
+ "not@here.website",
+ ],
+ stream_pos=0,
+ ),
+ set(
+ [
+ "user@foo.com",
+ "bar@baz.net",
+ "user@elsewhere.org",
+ "not@here.website",
+ ]
+ ),
+ )
+
+ def test_max_pos(self):
+ """
+ StreamChangeCache.get_max_pos_of_last_change will return the most
+ recent point where the entity could have changed. If the entity is not
+ known, the stream start is provided instead.
+ """
+ cache = StreamChangeCache("#test", 1)
+
+ cache.entity_has_changed("user@foo.com", 2)
+ cache.entity_has_changed("bar@baz.net", 3)
+ cache.entity_has_changed("user@elsewhere.org", 4)
+
+ # Known entities will return the point where they were changed.
+ self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 2)
+ self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3)
+ self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4)
+
+ # Unknown entities will return the stream start position.
+ self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1)
diff --git a/tox.ini b/tox.ini
index 99b35f399a..5d79098d2f 100644
--- a/tox.ini
+++ b/tox.ini
@@ -52,33 +52,41 @@ commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \
- tests/appservice/test_scheduler.py \
+ tests/api/test_filtering.py \
+ tests/api/test_ratelimiting.py \
+ tests/appservice \
+ tests/crypto \
+ tests/events \
+ tests/handlers/test_appservice.py \
tests/handlers/test_auth.py \
+ tests/handlers/test_device.py \
+ tests/handlers/test_directory.py \
+ tests/handlers/test_e2e_keys.py \
tests/handlers/test_presence.py \
+ tests/handlers/test_profile.py \
tests/handlers/test_register.py \
+ tests/replication/slave/storage/test_account_data.py \
+ tests/replication/slave/storage/test_receipts.py \
tests/storage/test_appservice.py \
+ tests/storage/test_background_update.py \
tests/storage/test_base.py \
+ tests/storage/test__base.py \
tests/storage/test_client_ips.py \
tests/storage/test_devices.py \
tests/storage/test_end_to_end_keys.py \
tests/storage/test_event_push_actions.py \
+ tests/storage/test_keys.py \
+ tests/storage/test_presence.py \
tests/storage/test_profile.py \
+ tests/storage/test_registration.py \
tests/storage/test_room.py \
+ tests/storage/test_user_directory.py \
tests/test_distributor.py \
tests/test_dns.py \
tests/test_preview.py \
tests/test_test_utils.py \
tests/test_types.py \
- tests/util/test_dict_cache.py \
- tests/util/test_expiring_cache.py \
- tests/util/test_file_consumer.py \
- tests/util/test_limiter.py \
- tests/util/test_linearizer.py \
- tests/util/test_logcontext.py \
- tests/util/test_logformatter.py \
- tests/util/test_rwlock.py \
- tests/util/test_snapshot_cache.py \
- tests/util/test_wheel_timer.py} \
+ tests/util} \
{env:TOXSUFFIX:}
{env:DUMP_COVERAGE_COMMAND:coverage report -m}
|