summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--CHANGES.rst58
-rw-r--r--docs/metrics-howto.rst50
-rw-r--r--docs/postgres.rst18
-rw-r--r--setup.cfg3
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/appservice/__init__.py6
-rw-r--r--synapse/config/consent_config.py3
-rw-r--r--synapse/federation/send_queue.py63
-rw-r--r--synapse/http/__init__.py13
-rw-r--r--synapse/http/client.py9
-rw-r--r--synapse/http/site.py9
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/rest/client/v1/admin.py12
-rw-r--r--synapse/storage/state.py2
-rw-r--r--synapse/util/caches/stream_change_cache.py57
-rw-r--r--tests/util/test_stream_change_cache.py198
-rw-r--r--tox.ini30
18 files changed, 415 insertions, 123 deletions
diff --git a/.gitignore b/.gitignore
index 7940158c5b..9f42a7568f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,6 +43,7 @@ media_store/
 
 build/
 venv/
+venv*/
 
 localhost-800*/
 static/client/register/register_config.js
diff --git a/CHANGES.rst b/CHANGES.rst
index 0569b581db..f2b7f04097 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,61 @@
+Changes in synapse v0.31.0 (2018-06-06)
+=======================================
+
+Most notable change from v0.30.0 is to switch to python prometheus library to improve system
+stats reporting. WARNING this changes a number of prometheus metrics in a
+backwards-incompatible manner. For more details, see
+`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.
+
+Bug Fixes:
+
+* Fix metric documentation tables (PR #3341)
+* Fix LaterGuage error handling (694968f)
+* Fix replication metrics (b7e7fd2)
+
+Changes in synapse v0.31.0-rc1 (2018-06-04)
+==========================================
+
+Features:
+
+* Switch to the Python Prometheus library (PR #3256, #3274)
+* Let users leave the server notice room after joining (PR #3287)
+
+
+Changes:
+
+* daily user type phone home stats (PR #3264)
+* Use iter* methods for _filter_events_for_server (PR #3267)
+* Docs on consent bits (PR #3268)
+* Remove users from user directory on deactivate (PR #3277)
+* Avoid sending consent notice to guest users (PR #3288)
+* disable CPUMetrics if no /proc/self/stat (PR #3299)
+* Add local and loopback IPv6 addresses to url_preview_ip_range_blacklist (PR #3312) Thanks to @thegcat!
+* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
+* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
+* Reduce stuck read-receipts: ignore depth when updating (PR #3318)
+* Put python's logs into Trial when running unit tests (PR #3319)
+
+Changes, python 3 migration:
+
+* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile!
+* replace some iteritems with six (PR #3244) Thanks to @NotAFile!
+* Add batch_iter to utils (PR #3245) Thanks to @NotAFile!
+* use repr, not str (PR #3246) Thanks to @NotAFile!
+* Misc Python3 fixes (PR #3247) Thanks to @NotAFile!
+* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile!
+* more six iteritems (PR #3279) Thanks to @NotAFile!
+* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile!
+* remaining isintance fixes (PR #3281) Thanks to @NotAFile!
+* py3-ize state.py (PR #3283) Thanks to @NotAFile!
+* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel!
+* use memoryview in py3 (PR #3303) Thanks to @NotAFile!
+
+Bugs:
+
+* Fix federation backfill bugs (PR #3261)
+* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
+
+
 Changes in synapse v0.30.0 (2018-05-24)
 ==========================================
 
diff --git a/docs/metrics-howto.rst b/docs/metrics-howto.rst
index 25e06bca58..5bbb5a4f3a 100644
--- a/docs/metrics-howto.rst
+++ b/docs/metrics-howto.rst
@@ -63,30 +63,40 @@ The duplicated metrics deprecated in Synapse 0.27.0 have been removed.
 
 All time duration-based metrics have been changed to be seconds. This affects:
 
-================================
-msec -> sec metrics
-================================
-python_gc_time
-python_twisted_reactor_tick_time
-synapse_storage_query_time
-synapse_storage_schedule_time
-synapse_storage_transaction_time
-================================
++----------------------------------+
+| msec -> sec metrics              |
++==================================+
+| python_gc_time                   |
++----------------------------------+
+| python_twisted_reactor_tick_time |
++----------------------------------+
+| synapse_storage_query_time       |
++----------------------------------+
+| synapse_storage_schedule_time    |
++----------------------------------+
+| synapse_storage_transaction_time |
++----------------------------------+
 
 Several metrics have been changed to be histograms, which sort entries into
 buckets and allow better analysis. The following metrics are now histograms:
 
-=========================================
-Altered metrics
-=========================================
-python_gc_time
-python_twisted_reactor_pending_calls
-python_twisted_reactor_tick_time
-synapse_http_server_response_time_seconds
-synapse_storage_query_time
-synapse_storage_schedule_time
-synapse_storage_transaction_time
-=========================================
++-------------------------------------------+
+| Altered metrics                           |
++===========================================+
+| python_gc_time                            |
++-------------------------------------------+
+| python_twisted_reactor_pending_calls      |
++-------------------------------------------+
+| python_twisted_reactor_tick_time          |
++-------------------------------------------+
+| synapse_http_server_response_time_seconds |
++-------------------------------------------+
+| synapse_storage_query_time                |
++-------------------------------------------+
+| synapse_storage_schedule_time             |
++-------------------------------------------+
+| synapse_storage_transaction_time          |
++-------------------------------------------+
 
 
 Block and response metrics renamed for 0.27.0
diff --git a/docs/postgres.rst b/docs/postgres.rst
index 296293e859..2377542296 100644
--- a/docs/postgres.rst
+++ b/docs/postgres.rst
@@ -9,19 +9,19 @@ Set up database
 Assuming your PostgreSQL database user is called ``postgres``, create a user
 ``synapse_user`` with::
 
- su - postgres
- createuser --pwprompt synapse_user
+   su - postgres
+   createuser --pwprompt synapse_user
 
 The PostgreSQL database used *must* have the correct encoding set, otherwise it
 would not be able to store UTF8 strings. To create a database with the correct
 encoding use, e.g.::
 
- CREATE DATABASE synapse
-  ENCODING 'UTF8'
-  LC_COLLATE='C'
-  LC_CTYPE='C'
-  template=template0
-  OWNER synapse_user;
+   CREATE DATABASE synapse
+    ENCODING 'UTF8'
+    LC_COLLATE='C'
+    LC_CTYPE='C'
+    template=template0
+    OWNER synapse_user;
 
 This would create an appropriate database named ``synapse`` owned by the
 ``synapse_user`` user (which must already exist).
@@ -126,7 +126,7 @@ run::
         --postgres-config homeserver-postgres.yaml
 
 Once that has completed, change the synapse config to point at the PostgreSQL
-database configuration file ``homeserver-postgres.yaml``:
+database configuration file ``homeserver-postgres.yaml``::
 
     ./synctl stop
     mv homeserver.yaml homeserver-old-sqlite.yaml 
diff --git a/setup.cfg b/setup.cfg
index da8eafbb39..fa6f2d1ce4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -17,4 +17,5 @@ ignore =
 [flake8]
 max-line-length = 90
 #  W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
-ignore = W503
+#  E203 is contrary to PEP8.
+ignore = W503,E203
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 5bada5e290..ca113db434 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.30.0"
+__version__ = "0.31.0"
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 5fdb579723..d1c598622a 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -292,4 +292,8 @@ class ApplicationService(object):
         return self.rate_limited
 
     def __str__(self):
-        return "ApplicationService: %s" % (self.__dict__,)
+        # copy dictionary and redact token fields so they don't get logged
+        dict_copy = self.__dict__.copy()
+        dict_copy["token"] = "<redacted>"
+        dict_copy["hs_token"] = "<redacted>"
+        return "ApplicationService: %s" % (dict_copy,)
diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py
index 8f6ed73328..e22c731aad 100644
--- a/synapse/config/consent_config.py
+++ b/synapse/config/consent_config.py
@@ -18,6 +18,9 @@ from ._base import Config
 DEFAULT_CONFIG = """\
 # User Consent configuration
 #
+# for detailed instructions, see
+# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md
+#
 # Parts of this section are required if enabling the 'consent' resource under
 # 'listeners', in particular 'template_dir' and 'version'.
 #
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 9f1142b5a9..1d5c0f3797 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
 from synapse.metrics import LaterGauge
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 from collections import namedtuple
 
 import logging
@@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object):
         self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
-        self.presence_changed = sorteddict()  # Stream position -> user_id
+        self.presence_changed = SortedDict()  # Stream position -> user_id
 
         self.keyed_edu = {}  # (destination, key) -> EDU
-        self.keyed_edu_changed = sorteddict()  # stream position -> (destination, key)
+        self.keyed_edu_changed = SortedDict()  # stream position -> (destination, key)
 
-        self.edus = sorteddict()  # stream position -> Edu
+        self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = sorteddict()  # stream position -> (destination, Failure)
+        self.failures = SortedDict()  # stream position -> (destination, Failure)
 
-        self.device_messages = sorteddict()  # stream position -> destination
+        self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
-        self.pos_time = sorteddict()
+        self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
@@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object):
         now = self.clock.time_msec()
 
         keys = self.pos_time.keys()
-        time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+        time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
         if not keys[:time]:
             return
 
@@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object):
         with Measure(self.clock, "send_queue._clear"):
             # Delete things out of presence maps
             keys = self.presence_changed.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.presence_changed.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.presence_changed[key]
 
@@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object):
 
             # Delete things out of keyed edus
             keys = self.keyed_edu_changed.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.keyed_edu_changed.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.keyed_edu_changed[key]
 
@@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object):
 
             # Delete things out of edu map
             keys = self.edus.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.edus.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.edus[key]
 
             # Delete things out of failure map
             keys = self.failures.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.failures.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.failures[key]
 
             # Delete things out of device map
             keys = self.device_messages.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.device_messages.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.device_messages[key]
 
@@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object):
             self._clear_queue_before_pos(federation_ack)
 
         # Fetch changed presence
-        keys = self.presence_changed.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
+        i = self.presence_changed.bisect_right(from_token)
+        j = self.presence_changed.bisect_right(to_token) + 1
         dest_user_ids = [
             (pos, user_id)
-            for pos in keys[i:j]
-            for user_id in self.presence_changed[pos]
+            for pos, user_id_list in self.presence_changed.items()[i:j]
+            for user_id in user_id_list
         ]
 
         for (key, user_id) in dest_user_ids:
@@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changes keyed edus
-        keys = self.keyed_edu_changed.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
+        i = self.keyed_edu_changed.bisect_right(from_token)
+        j = self.keyed_edu_changed.bisect_right(to_token) + 1
         # We purposefully clobber based on the key here, python dict comprehensions
         # always use the last value, so this will correctly point to the last
         # stream position.
-        keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
+        keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
 
         for ((destination, edu_key), pos) in iteritems(keyed_edus):
             rows.append((pos, KeyedEduRow(
@@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changed edus
-        keys = self.edus.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        edus = ((k, self.edus[k]) for k in keys[i:j])
+        i = self.edus.bisect_right(from_token)
+        j = self.edus.bisect_right(to_token) + 1
+        edus = self.edus.items()[i:j]
 
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
         # Fetch changed failures
-        keys = self.failures.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        failures = ((k, self.failures[k]) for k in keys[i:j])
+        i = self.failures.bisect_right(from_token)
+        j = self.failures.bisect_right(to_token) + 1
+        failures = self.failures.items()[i:j]
 
         for (pos, (destination, failure)) in failures:
             rows.append((pos, FailureRow(
@@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changed device messages
-        keys = self.device_messages.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        device_messages = {self.device_messages[k]: k for k in keys[i:j]}
+        i = self.device_messages.bisect_right(from_token)
+        j = self.device_messages.bisect_right(to_token) + 1
+        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
 
         for (destination, pos) in iteritems(device_messages):
             rows.append((pos, DeviceRow(
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 054372e179..58ef8d3ce4 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -13,6 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import re
+
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
 
@@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
         value.trap(CancelledError)
         raise RequestTimedOutError()
     return value
+
+
+ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+
+
+def redact_uri(uri):
+    """Strips access tokens from the uri replaces with <redacted>"""
+    return ACCESS_TOKEN_RE.sub(
+        br'\1<redacted>\3',
+        uri
+    )
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4d4eee3d64..8064a84c5c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -19,7 +19,7 @@ from OpenSSL.SSL import VERIFY_NONE
 from synapse.api.errors import (
     CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
 )
-from synapse.http import cancelled_to_request_timed_out_error
+from synapse.http import cancelled_to_request_timed_out_error, redact_uri
 from synapse.util.async import add_timeout_to_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
@@ -90,7 +90,8 @@ class SimpleHttpClient(object):
         # counters to it
         outgoing_requests_counter.labels(method).inc()
 
-        logger.info("Sending request %s %s", method, uri)
+        # log request but strip `access_token` (AS requests for example include this)
+        logger.info("Sending request %s %s", method, redact_uri(uri))
 
         try:
             request_deferred = self.agent.request(
@@ -105,14 +106,14 @@ class SimpleHttpClient(object):
             incoming_responses_counter.labels(method, response.code).inc()
             logger.info(
                 "Received response to  %s %s: %s",
-                method, uri, response.code
+                method, redact_uri(uri), response.code
             )
             defer.returnValue(response)
         except Exception as e:
             incoming_responses_counter.labels(method, "ERR").inc()
             logger.info(
                 "Error sending request to  %s %s: %s %s",
-                method, uri, type(e).__name__, e.message
+                method, redact_uri(uri), type(e).__name__, e.message
             )
             raise e
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 60299657b9..2664006f8c 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,18 +14,16 @@
 
 import contextlib
 import logging
-import re
 import time
 
 from twisted.web.server import Site, Request
 
+from synapse.http import redact_uri
 from synapse.http.request_metrics import RequestMetrics
 from synapse.util.logcontext import LoggingContext
 
 logger = logging.getLogger(__name__)
 
-ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
-
 _next_request_seq = 0
 
 
@@ -69,10 +67,7 @@ class SynapseRequest(Request):
         return "%s-%i" % (self.method, self.request_seq)
 
     def get_redacted_uri(self):
-        return ACCESS_TOKEN_RE.sub(
-            br'\1<redacted>\3',
-            self.uri
-        )
+        return redact_uri(self.uri)
 
     def get_user_agent(self):
         return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 478c497722..faf6dfdb8d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -50,14 +50,16 @@ REQUIREMENTS = {
     "bcrypt": ["bcrypt>=3.1.0"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
-    "blist": ["blist"],
+    "sortedcontainers": ["sortedcontainers"],
     "pysaml2>=3.0.0": ["saml2>=3.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
     "six": ["six"],
     "prometheus_client": ["prometheus_client"],
+    "attr": ["attr"],
 }
+
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
         "matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 6835a7bba2..b8665a45eb 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                 yield self.store.find_first_stream_ordering_after_ts(ts)
             )
 
-            room_event_after_stream_ordering = (
+            r = (
                 yield self.store.get_room_event_after_stream_ordering(
                     room_id, stream_ordering,
                 )
             )
-            if room_event_after_stream_ordering:
-                token = yield self.store.get_topological_token_for_event(
-                    room_event_after_stream_ordering,
-                )
-            else:
+            if not r:
                 logger.warn(
                     "[purge] purging events not possible: No event found "
                     "(received_ts %i => stream_ordering %i)",
@@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                     "there is no event to be purged",
                     errcode=Codes.NOT_FOUND,
                 )
+            (stream, topo, _event_id) = r
+            token = "t%d-%d" % (topo, stream)
             logger.info(
-                "[purge] purging up to token %d (received_ts %i => "
+                "[purge] purging up to token %s (received_ts %i => "
                 "stream_ordering %i)",
                 token, ts, stream_ordering,
             )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index c11bc52177..85b8ec2b8f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -272,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                 for typ in types:
                     if typ[1] is None:
                         where_clauses.append("(type = ?)")
-                        where_args.extend(typ[0])
+                        where_args.append(typ[0])
                         wildcard_types = True
                     else:
                         where_clauses.append("(type = ? AND state_key = ?)")
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index a7fe0397fa..817118e30f 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util import caches
 
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 import logging
 
 
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
     entities that may have changed since that position. If position key is too
     old then the cache will simply return all given entities.
     """
-    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+        self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
-        self._cache = sorteddict()
+        self._cache = SortedDict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        self.metrics = register_cache("cache", self.name, self._cache)
+        self.metrics = caches.register_cache("cache", self.name, self._cache)
 
-        for entity, stream_pos in prefilled_cache.items():
-            self.entity_has_changed(entity, stream_pos)
+        if prefilled_cache:
+            for entity, stream_pos in prefilled_cache.items():
+                self.entity_has_changed(entity, stream_pos)
 
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
         return False
 
     def get_entities_changed(self, entities, stream_pos):
-        """Returns subset of entities that have had new things since the
-        given position. If the position is too old it will just return the given list.
+        """
+        Returns subset of entities that have had new things since the given
+        position.  Entities unknown to the cache will be returned.  If the
+        position is too old it will just return the given list.
         """
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
+            not_known_entities = set(entities) - set(self._entity_to_key)
 
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(entities)
+            result = (
+                set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+                .intersection(entities)
+                .union(not_known_entities)
+            )
 
             self.metrics.inc_hits()
         else:
-            result = entities
+            result = set(entities)
             self.metrics.inc_misses()
 
         return result
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
         """
         assert type(stream_pos) is int
 
+        if not self._cache:
+            # If we have no cache, nothing can have changed.
+            return False
+
         if stream_pos >= self._earliest_known_stream_pos:
             self.metrics.inc_hits()
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return i < len(keys)
+            return self._cache.bisect_right(stream_pos) < len(self._cache)
         else:
             self.metrics.inc_misses()
             return True
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return [self._cache[k] for k in keys[i:]]
+            return self._cache.values()[self._cache.bisect_right(stream_pos) :]
         else:
             return None
 
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
             self._entity_to_key[entity] = stream_pos
 
             while len(self._cache) > self._max_size:
-                k, r = self._cache.popitem()
-                self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+                k, r = self._cache.popitem(0)
+                self._earliest_known_stream_pos = max(
+                    k, self._earliest_known_stream_pos,
+                )
                 self._entity_to_key.pop(r, None)
 
     def get_max_pos_of_last_change(self, entity):
diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py
new file mode 100644
index 0000000000..67ece166c7
--- /dev/null
+++ b/tests/util/test_stream_change_cache.py
@@ -0,0 +1,198 @@
+from tests import unittest
+from mock import patch
+
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+
+class StreamChangeCacheTests(unittest.TestCase):
+    """
+    Tests for StreamChangeCache.
+    """
+
+    def test_prefilled_cache(self):
+        """
+        Providing a prefilled cache to StreamChangeCache will result in a cache
+        with the prefilled-cache entered in.
+        """
+        cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
+        self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
+
+    def test_has_entity_changed(self):
+        """
+        StreamChangeCache.entity_has_changed will mark entities as changed, and
+        has_entity_changed will observe the changed entities.
+        """
+        cache = StreamChangeCache("#test", 3)
+
+        cache.entity_has_changed("user@foo.com", 6)
+        cache.entity_has_changed("bar@baz.net", 7)
+
+        # If it's been changed after that stream position, return True
+        self.assertTrue(cache.has_entity_changed("user@foo.com", 4))
+        self.assertTrue(cache.has_entity_changed("bar@baz.net", 4))
+
+        # If it's been changed at that stream position, return False
+        self.assertFalse(cache.has_entity_changed("user@foo.com", 6))
+
+        # If there's no changes after that stream position, return False
+        self.assertFalse(cache.has_entity_changed("user@foo.com", 7))
+
+        # If the entity does not exist, return False.
+        self.assertFalse(cache.has_entity_changed("not@here.website", 7))
+
+        # If we request before the stream cache's earliest known position,
+        # return True, whether it's a known entity or not.
+        self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
+        self.assertTrue(cache.has_entity_changed("not@here.website", 0))
+
+    @patch("synapse.util.caches.CACHE_SIZE_FACTOR", 1.0)
+    def test_has_entity_changed_pops_off_start(self):
+        """
+        StreamChangeCache.entity_has_changed will respect the max size and
+        purge the oldest items upon reaching that max size.
+        """
+        cache = StreamChangeCache("#test", 1, max_size=2)
+
+        cache.entity_has_changed("user@foo.com", 2)
+        cache.entity_has_changed("bar@baz.net", 3)
+        cache.entity_has_changed("user@elsewhere.org", 4)
+
+        # The cache is at the max size, 2
+        self.assertEqual(len(cache._cache), 2)
+
+        # The oldest item has been popped off
+        self.assertTrue("user@foo.com" not in cache._entity_to_key)
+
+        # If we update an existing entity, it keeps the two existing entities
+        cache.entity_has_changed("bar@baz.net", 5)
+        self.assertEqual(
+            set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key)
+        )
+
+    def test_get_all_entities_changed(self):
+        """
+        StreamChangeCache.get_all_entities_changed will return all changed
+        entities since the given position.  If the position is before the start
+        of the known stream, it returns None instead.
+        """
+        cache = StreamChangeCache("#test", 1)
+
+        cache.entity_has_changed("user@foo.com", 2)
+        cache.entity_has_changed("bar@baz.net", 3)
+        cache.entity_has_changed("user@elsewhere.org", 4)
+
+        self.assertEqual(
+            cache.get_all_entities_changed(1),
+            ["user@foo.com", "bar@baz.net", "user@elsewhere.org"],
+        )
+        self.assertEqual(
+            cache.get_all_entities_changed(2), ["bar@baz.net", "user@elsewhere.org"]
+        )
+        self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"])
+        self.assertEqual(cache.get_all_entities_changed(0), None)
+
+    def test_has_any_entity_changed(self):
+        """
+        StreamChangeCache.has_any_entity_changed will return True if any
+        entities have been changed since the provided stream position, and
+        False if they have not.  If the cache has entries and the provided
+        stream position is before it, it will return True, otherwise False if
+        the cache has no entries.
+        """
+        cache = StreamChangeCache("#test", 1)
+
+        # With no entities, it returns False for the past, present, and future.
+        self.assertFalse(cache.has_any_entity_changed(0))
+        self.assertFalse(cache.has_any_entity_changed(1))
+        self.assertFalse(cache.has_any_entity_changed(2))
+
+        # We add an entity
+        cache.entity_has_changed("user@foo.com", 2)
+
+        # With an entity, it returns True for the past, the stream start
+        # position, and False for the stream position the entity was changed
+        # on and ones after it.
+        self.assertTrue(cache.has_any_entity_changed(0))
+        self.assertTrue(cache.has_any_entity_changed(1))
+        self.assertFalse(cache.has_any_entity_changed(2))
+        self.assertFalse(cache.has_any_entity_changed(3))
+
+    def test_get_entities_changed(self):
+        """
+        StreamChangeCache.get_entities_changed will return the entities in the
+        given list that have changed since the provided stream ID.  If the
+        stream position is earlier than the earliest known position, it will
+        return all of the entities queried for.
+        """
+        cache = StreamChangeCache("#test", 1)
+
+        cache.entity_has_changed("user@foo.com", 2)
+        cache.entity_has_changed("bar@baz.net", 3)
+        cache.entity_has_changed("user@elsewhere.org", 4)
+
+        # Query all the entries, but mid-way through the stream. We should only
+        # get the ones after that point.
+        self.assertEqual(
+            cache.get_entities_changed(
+                ["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2
+            ),
+            set(["bar@baz.net", "user@elsewhere.org"]),
+        )
+
+        # Query all the entries mid-way through the stream, but include one
+        # that doesn't exist in it. We should get back the one that doesn't
+        # exist, too.
+        self.assertEqual(
+            cache.get_entities_changed(
+                [
+                    "user@foo.com",
+                    "bar@baz.net",
+                    "user@elsewhere.org",
+                    "not@here.website",
+                ],
+                stream_pos=2,
+            ),
+            set(["bar@baz.net", "user@elsewhere.org", "not@here.website"]),
+        )
+
+        # Query all the entries, but before the first known point. We will get
+        # all the entries we queried for, including ones that don't exist.
+        self.assertEqual(
+            cache.get_entities_changed(
+                [
+                    "user@foo.com",
+                    "bar@baz.net",
+                    "user@elsewhere.org",
+                    "not@here.website",
+                ],
+                stream_pos=0,
+            ),
+            set(
+                [
+                    "user@foo.com",
+                    "bar@baz.net",
+                    "user@elsewhere.org",
+                    "not@here.website",
+                ]
+            ),
+        )
+
+    def test_max_pos(self):
+        """
+        StreamChangeCache.get_max_pos_of_last_change will return the most
+        recent point where the entity could have changed.  If the entity is not
+        known, the stream start is provided instead.
+        """
+        cache = StreamChangeCache("#test", 1)
+
+        cache.entity_has_changed("user@foo.com", 2)
+        cache.entity_has_changed("bar@baz.net", 3)
+        cache.entity_has_changed("user@elsewhere.org", 4)
+
+        # Known entities will return the point where they were changed.
+        self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 2)
+        self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3)
+        self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4)
+
+        # Unknown entities will return the stream start position.
+        self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1)
diff --git a/tox.ini b/tox.ini
index 99b35f399a..5d79098d2f 100644
--- a/tox.ini
+++ b/tox.ini
@@ -52,33 +52,41 @@ commands =
     /usr/bin/find "{toxinidir}" -name '*.pyc' -delete
     coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
         "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \
-		tests/appservice/test_scheduler.py \
+		tests/api/test_filtering.py \
+		tests/api/test_ratelimiting.py \
+		tests/appservice \
+		tests/crypto \
+		tests/events \
+		tests/handlers/test_appservice.py \
 		tests/handlers/test_auth.py \
+		tests/handlers/test_device.py \
+		tests/handlers/test_directory.py \
+		tests/handlers/test_e2e_keys.py \
 		tests/handlers/test_presence.py \
+		tests/handlers/test_profile.py \
 		tests/handlers/test_register.py \
+		tests/replication/slave/storage/test_account_data.py \
+		tests/replication/slave/storage/test_receipts.py \
 		tests/storage/test_appservice.py \
+		tests/storage/test_background_update.py \
 		tests/storage/test_base.py \
+		tests/storage/test__base.py \
 		tests/storage/test_client_ips.py \
 		tests/storage/test_devices.py \
 		tests/storage/test_end_to_end_keys.py \
 		tests/storage/test_event_push_actions.py \
+		tests/storage/test_keys.py \
+		tests/storage/test_presence.py \
 		tests/storage/test_profile.py \
+		tests/storage/test_registration.py \
 		tests/storage/test_room.py \
+		tests/storage/test_user_directory.py \
 		tests/test_distributor.py \
 		tests/test_dns.py \
 		tests/test_preview.py \
 		tests/test_test_utils.py \
 		tests/test_types.py \
-		tests/util/test_dict_cache.py \
-		tests/util/test_expiring_cache.py \
-		tests/util/test_file_consumer.py \
-		tests/util/test_limiter.py \
-		tests/util/test_linearizer.py \
-		tests/util/test_logcontext.py \
-		tests/util/test_logformatter.py \
-		tests/util/test_rwlock.py \
-		tests/util/test_snapshot_cache.py \
-		tests/util/test_wheel_timer.py} \
+		tests/util} \
         {env:TOXSUFFIX:}
     {env:DUMP_COVERAGE_COMMAND:coverage report -m}