summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-02-03 11:06:29 +0000
committerErik Johnston <erik@matrix.org>2016-02-03 11:06:29 +0000
commit6f52e90065243682d93e5f89b071e1fb056fa6ab (patch)
tree98b9c43dd42dbc3d14a96f1b81e0631c7ac271c9 /synapse
parentUnused import (diff)
parentMerge pull request #553 from matrix-org/daniel/accesstokenlogging (diff)
downloadsynapse-6f52e90065243682d93e5f89b071e1fb056fa6ab.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/public_room_fix
Diffstat (limited to '')
-rw-r--r--synapse/api/auth.py4
-rw-r--r--synapse/app/__init__.py19
-rwxr-xr-xsynapse/app/homeserver.py38
-rw-r--r--synapse/appservice/api.py2
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/directory.py4
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/handlers/register.py6
-rw-r--r--synapse/handlers/room.py11
-rw-r--r--synapse/handlers/sync.py167
-rw-r--r--synapse/http/matrixfederationclient.py2
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/push/push_rule_evaluator.py2
-rw-r--r--synapse/rest/client/v1/login.py2
-rw-r--r--synapse/rest/client/v1/pusher.py4
-rw-r--r--synapse/rest/client/v1/register.py3
-rw-r--r--synapse/rest/client/v2_alpha/register.py4
-rw-r--r--synapse/rest/client/v2_alpha/sync.py75
-rw-r--r--synapse/rest/client/versions.py4
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/state.py2
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py7
-rw-r--r--synapse/storage/engines/sqlite3.py2
-rw-r--r--synapse/storage/event_federation.py2
-rw-r--r--synapse/storage/events.py6
-rw-r--r--synapse/storage/stream.py59
-rw-r--r--synapse/util/__init__.py2
-rw-r--r--synapse/util/caches/descriptors.py4
-rw-r--r--synapse/util/caches/expiringcache.py2
-rw-r--r--synapse/util/caches/treecache.py2
-rw-r--r--synapse/util/logutils.py2
-rw-r--r--synapse/util/ratelimitutils.py2
35 files changed, 215 insertions, 238 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index b5536e8565..5bba9343f6 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -574,7 +574,7 @@ class Auth(object):
             raise AuthError(
                 403,
                 "Application service has not registered this user"
-                )
+            )
         defer.returnValue(user_id)
 
     @defer.inlineCallbacks
@@ -696,6 +696,7 @@ class Auth(object):
     def _look_up_user_by_access_token(self, token):
         ret = yield self.store.get_user_by_access_token(token)
         if not ret:
+            logger.warn("Unrecognised access token - not in store: %s" % (token,))
             raise AuthError(
                 self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.",
                 errcode=Codes.UNKNOWN_TOKEN
@@ -713,6 +714,7 @@ class Auth(object):
             token = request.args["access_token"][0]
             service = yield self.store.get_app_service_by_token(token)
             if not service:
+                logger.warn("Unrecognised appservice access token: %s" % (token,))
                 raise AuthError(
                     self.TOKEN_NOT_FOUND_HTTP_STATUS,
                     "Unrecognised access token.",
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index bfebb0f644..1bc4279807 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -12,3 +12,22 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import sys
+sys.dont_write_bytecode = True
+
+from synapse.python_dependencies import (
+    check_requirements, MissingRequirementError
+)  # NOQA
+
+try:
+    check_requirements()
+except MissingRequirementError as e:
+    message = "\n".join([
+        "Missing Requirement: %s" % (e.message,),
+        "To install run:",
+        "    pip install --upgrade --force \"%s\"" % (e.dependency,),
+        "",
+    ])
+    sys.stderr.writelines(message)
+    sys.exit(1)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e5066c48ef..c3066d6a0d 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -14,27 +14,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import synapse
+
+import contextlib
+import logging
+import os
+import re
+import resource
+import subprocess
 import sys
-from synapse.rest import ClientRestResource
+import time
 
-sys.dont_write_bytecode = True
 from synapse.python_dependencies import (
-    check_requirements, DEPENDENCY_LINKS, MissingRequirementError
+    check_requirements, DEPENDENCY_LINKS
 )
 
-if __name__ == '__main__':
-    try:
-        check_requirements()
-    except MissingRequirementError as e:
-        message = "\n".join([
-            "Missing Requirement: %s" % (e.message,),
-            "To install run:",
-            "    pip install --upgrade --force \"%s\"" % (e.dependency,),
-            "",
-        ])
-        sys.stderr.writelines(message)
-        sys.exit(1)
-
+from synapse.rest import ClientRestResource
 from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.prepare_database import UpgradeDatabaseException
@@ -73,17 +68,6 @@ from synapse import events
 
 from daemonize import Daemonize
 
-import synapse
-
-import contextlib
-import logging
-import os
-import re
-import resource
-import subprocess
-import time
-
-
 logger = logging.getLogger("synapse.app.homeserver")
 
 
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index e1c07028e8..bc90605324 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -29,7 +29,7 @@ class ApplicationServiceApi(SimpleHttpClient):
     pushing.
     """
 
-    def __init__(self,  hs):
+    def __init__(self, hs):
         super(ApplicationServiceApi, self).__init__(hs)
         self.clock = hs.get_clock()
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c6259f9dc8..e30e2da58d 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -57,7 +57,7 @@ class FederationClient(FederationBase):
             cache_name="get_pdu_cache",
             clock=self._clock,
             max_len=1000,
-            expiry_ms=120*1000,
+            expiry_ms=120 * 1000,
             reset_expiry_on_get=False,
         )
 
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 744a9ee507..1423df6cf3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -147,7 +147,7 @@ class BaseHandler(object):
         )
         if not allowed:
             raise LimitExceededError(
-                retry_after_ms=int(1000*(time_allowed - time_now)),
+                retry_after_ms=int(1000 * (time_allowed - time_now)),
             )
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 691564c651..4efecb1ffd 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -175,8 +175,8 @@ class DirectoryHandler(BaseHandler):
         # If this server is in the list of servers, return it first.
         if self.server_name in servers:
             servers = (
-                [self.server_name]
-                + [s for s in servers if s != self.server_name]
+                [self.server_name] +
+                [s for s in servers if s != self.server_name]
             )
         else:
             servers = list(servers)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 254b483da6..5ad8f3779a 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -130,7 +130,7 @@ class EventStreamHandler(BaseHandler):
 
                 # Add some randomness to this value to try and mitigate against
                 # thundering herds on restart.
-                timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+                timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
 
             events, tokens = yield self.notifier.get_events_for(
                 auth_user, pagin_config, timeout,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d36eb3b8d7..d0c21ff5c9 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -34,7 +34,7 @@ metrics = synapse.metrics.get_metrics_for(__name__)
 
 
 # Don't bother bumping "last active" time if it differs by less than 60 seconds
-LAST_ACTIVE_GRANULARITY = 60*1000
+LAST_ACTIVE_GRANULARITY = 60 * 1000
 
 # Keep no more than this number of offline serial revisions
 MAX_OFFLINE_SERIALS = 1000
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c11b98d0b7..b8fbcf9233 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -139,7 +139,9 @@ class RegistrationHandler(BaseHandler):
                     yield self.store.register(
                         user_id=user_id,
                         token=token,
-                        password_hash=password_hash)
+                        password_hash=password_hash,
+                        make_guest=make_guest
+                    )
 
                     yield registered_user(self.distributor, user)
                 except SynapseError:
@@ -211,7 +213,7 @@ class RegistrationHandler(BaseHandler):
                 400,
                 "User ID must only contain characters which do not"
                 " require URL encoding."
-                )
+            )
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 1b3f624c67..0daf2ce28e 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 
-from synapse.types import UserID, RoomAlias, RoomID
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
 from synapse.api.constants import (
     EventTypes, Membership, JoinRules, RoomCreationPreset,
 )
@@ -967,7 +967,7 @@ class RoomContextHandler(BaseHandler):
         Returns:
             dict, or None if the event isn't found
         """
-        before_limit = math.floor(limit/2.)
+        before_limit = math.floor(limit / 2.)
         after_limit = limit - before_limit
 
         now_token = yield self.hs.get_event_sources().get_current_token()
@@ -1037,6 +1037,11 @@ class RoomEventSource(object):
 
         to_key = yield self.get_current_key()
 
+        from_token = RoomStreamToken.parse(from_key)
+        if from_token.topological:
+            logger.warn("Stream has topological part!!!! %r", from_key)
+            from_key = "s%s" % (from_token.stream,)
+
         app_service = yield self.store.get_app_service_by_user_id(
             user.to_string()
         )
@@ -1048,7 +1053,7 @@ class RoomEventSource(object):
                 limit=limit,
             )
         else:
-            room_events = yield self.store.get_room_changes_for_user(
+            room_events = yield self.store.get_membership_changes_for_user(
                 user.to_string(), from_key, to_key
             )
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 075566417f..dc686db541 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 
 import collections
 import logging
+import itertools
 
 logger = logging.getLogger(__name__)
 
@@ -478,7 +479,7 @@ class SyncHandler(BaseHandler):
         )
 
         # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_room_changes_for_user(
+        rooms_changed = yield self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
         )
 
@@ -672,35 +673,10 @@ class SyncHandler(BaseHandler):
                                            account_data_by_room,
                                            all_ephemeral_by_room,
                                            batch, full_state=False):
-        if full_state:
-            state = yield self.get_state_at(room_id, now_token)
-
-        elif batch.limited:
-            current_state = yield self.get_state_at(room_id, now_token)
-
-            state_at_previous_sync = yield self.get_state_at(
-                room_id, stream_position=since_token
-            )
-
-            state = yield self.compute_state_delta(
-                since_token=since_token,
-                previous_state=state_at_previous_sync,
-                current_state=current_state,
-            )
-        else:
-            state = {
-                (event.type, event.state_key): event
-                for event in batch.events if event.is_state()
-            }
-
-        just_joined = yield self.check_joined_room(sync_config, state)
-        if just_joined:
-            state = yield self.get_state_at(room_id, now_token)
-
-        state = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(state.values())
-        }
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
+        )
 
         account_data = self.account_data_for_room(
             room_id, tags_by_room, account_data_by_room
@@ -766,30 +742,11 @@ class SyncHandler(BaseHandler):
 
         logger.debug("Recents %r", batch)
 
-        state_events_at_leave = yield self.store.get_state_for_event(
-            leave_event_id
+        state_events_delta = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, leave_token,
+            full_state=full_state
         )
 
-        if not full_state:
-            state_at_previous_sync = yield self.get_state_at(
-                room_id, stream_position=since_token
-            )
-
-            state_events_delta = yield self.compute_state_delta(
-                since_token=since_token,
-                previous_state=state_at_previous_sync,
-                current_state=state_events_at_leave,
-            )
-        else:
-            state_events_delta = state_events_at_leave
-
-        state_events_delta = {
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(
-                state_events_delta.values()
-            )
-        }
-
         account_data = self.account_data_for_room(
             room_id, tags_by_room, account_data_by_room
         )
@@ -843,15 +800,19 @@ class SyncHandler(BaseHandler):
             state = {}
         defer.returnValue(state)
 
-    def compute_state_delta(self, since_token, previous_state, current_state):
-        """ Works out the differnce in state between the current state and the
-        state the client got when it last performed a sync.
-
-        :param str since_token: the point we are comparing against
-        :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
-            state to compare to
-        :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
-            new state
+    @defer.inlineCallbacks
+    def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
+                            full_state):
+        """ Works out the differnce in state between the start of the timeline
+        and the previous sync.
+
+        :param str room_id
+        :param TimelineBatch batch: The timeline batch for the room that will
+            be sent to the user.
+        :param sync_config
+        :param str since_token: Token of the end of the previous batch. May be None.
+        :param str now_token: Token of the end of the current batch.
+        :param bool full_state: Whether to force returning the full state.
 
         :returns A new event dictionary
         """
@@ -860,12 +821,50 @@ class SyncHandler(BaseHandler):
         # updates even if they occured logically before the previous event.
         # TODO(mjark) Check for new redactions in the state events.
 
-        state_delta = {}
-        for key, event in current_state.iteritems():
-            if (key not in previous_state or
-                    previous_state[key].event_id != event.event_id):
-                state_delta[key] = event
-        return state_delta
+        if full_state:
+            if batch:
+                state = yield self.store.get_state_for_event(batch.events[0].event_id)
+            else:
+                state = yield self.get_state_at(
+                    room_id, stream_position=now_token
+                )
+
+            timeline_state = {
+                (event.type, event.state_key): event
+                for event in batch.events if event.is_state()
+            }
+
+            state = _calculate_state(
+                timeline_contains=timeline_state,
+                timeline_start=state,
+                previous={},
+            )
+        elif batch.limited:
+            state_at_previous_sync = yield self.get_state_at(
+                room_id, stream_position=since_token
+            )
+
+            state_at_timeline_start = yield self.store.get_state_for_event(
+                batch.events[0].event_id
+            )
+
+            timeline_state = {
+                (event.type, event.state_key): event
+                for event in batch.events if event.is_state()
+            }
+
+            state = _calculate_state(
+                timeline_contains=timeline_state,
+                timeline_start=state_at_timeline_start,
+                previous=state_at_previous_sync,
+            )
+        else:
+            state = {}
+
+        defer.returnValue({
+            (e.type, e.state_key): e
+            for e in sync_config.filter_collection.filter_room_state(state.values())
+        })
 
     def check_joined_room(self, sync_config, state_delta):
         """
@@ -912,3 +911,37 @@ def _action_has_highlight(actions):
             pass
 
     return False
+
+
+def _calculate_state(timeline_contains, timeline_start, previous):
+    """Works out what state to include in a sync response.
+
+    Args:
+        timeline_contains (dict): state in the timeline
+        timeline_start (dict): state at the start of the timeline
+        previous (dict): state at the end of the previous sync (or empty dict
+            if this is an initial sync)
+
+    Returns:
+        dict
+    """
+    event_id_to_state = {
+        e.event_id: e
+        for e in itertools.chain(
+            timeline_contains.values(),
+            previous.values(),
+            timeline_start.values(),
+        )
+    }
+
+    tc_ids = set(e.event_id for e in timeline_contains.values())
+    p_ids = set(e.event_id for e in previous.values())
+    ts_ids = set(e.event_id for e in timeline_start.values())
+
+    state_ids = (ts_ids - p_ids) - tc_ids
+
+    evs = (event_id_to_state[e] for e in state_ids)
+    return {
+        (e.type, e.state_key): e
+        for e in evs
+    }
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index da13e32e78..c3589534f8 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -152,7 +152,7 @@ class MatrixFederationHttpClient(object):
 
                         return self.clock.time_bound_deferred(
                             request_deferred,
-                            time_out=timeout/1000. if timeout else 60,
+                            time_out=timeout / 1000. if timeout else 60,
                         )
 
                     response = yield preserve_context_over_fn(
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 29965a9ab5..1a90bd55cd 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -308,7 +308,7 @@ class Notifier(object):
             def timed_out():
                 if listener:
                     listener.deferred.cancel()
-            timer = self.clock.call_later(timeout/1000., timed_out)
+            timer = self.clock.call_later(timeout / 1000., timed_out)
 
             prev_token = from_token
             while not result:
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index dca018af95..2a2b4437dc 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -304,7 +304,7 @@ def _flatten_dict(d, prefix=[], result={}):
         if isinstance(value, basestring):
             result[".".join(prefix + [key])] = value.lower()
         elif hasattr(value, "items"):
-            _flatten_dict(value, prefix=(prefix+[key]), result=result)
+            _flatten_dict(value, prefix=(prefix + [key]), result=result)
 
     return result
 
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 07836709fb..7199113dac 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -89,7 +89,7 @@ class LoginRestServlet(ClientV1RestServlet):
                                          LoginRestServlet.SAML2_TYPE):
                 relay_state = ""
                 if "relay_state" in login_submission:
-                    relay_state = "&RelayState="+urllib.quote(
+                    relay_state = "&RelayState=" + urllib.quote(
                                   login_submission["relay_state"])
                 result = {
                     "uri": "%s%s" % (self.idp_redirect_url, relay_state)
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index e218ed215c..5547f1b112 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -52,7 +52,7 @@ class PusherRestServlet(ClientV1RestServlet):
             if i not in content:
                 missing.append(i)
         if len(missing):
-            raise SynapseError(400, "Missing parameters: "+','.join(missing),
+            raise SynapseError(400, "Missing parameters: " + ','.join(missing),
                                errcode=Codes.MISSING_PARAM)
 
         logger.debug("set pushkey %s to kind %s", content['pushkey'], content['kind'])
@@ -83,7 +83,7 @@ class PusherRestServlet(ClientV1RestServlet):
                 data=content['data']
             )
         except PusherConfigException as pce:
-            raise SynapseError(400, "Config Error: "+pce.message,
+            raise SynapseError(400, "Config Error: " + pce.message,
                                errcode=Codes.MISSING_PARAM)
 
         defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 5378a9a938..2bfd4d96bf 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -38,7 +38,8 @@ logger = logging.getLogger(__name__)
 if hasattr(hmac, "compare_digest"):
     compare_digest = hmac.compare_digest
 else:
-    compare_digest = lambda a, b: a == b
+    def compare_digest(a, b):
+        return a == b
 
 
 class RegisterRestServlet(ClientV1RestServlet):
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index c4d025b465..56a5bbec30 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -34,7 +34,8 @@ from synapse.util.async import run_on_reactor
 if hasattr(hmac, "compare_digest"):
     compare_digest = hmac.compare_digest
 else:
-    compare_digest = lambda a, b: a == b
+    def compare_digest(a, b):
+        return a == b
 
 
 logger = logging.getLogger(__name__)
@@ -152,6 +153,7 @@ class RegisterRestServlet(RestServlet):
 
         desired_username = params.get("username", None)
         new_password = params.get("password", None)
+        guest_access_token = params.get("guest_access_token", None)
 
         (user_id, token) = yield self.registration_handler.register(
             localpart=desired_username,
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 07b5b5dfd5..140ce2704b 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -20,7 +20,6 @@ from synapse.http.servlet import (
 )
 from synapse.handlers.sync import SyncConfig
 from synapse.types import StreamToken
-from synapse.events import FrozenEvent
 from synapse.events.utils import (
     serialize_event, format_event_for_client_v2_without_room_id,
 )
@@ -287,9 +286,6 @@ class SyncRestServlet(RestServlet):
         state_dict = room.state
         timeline_events = room.timeline.events
 
-        state_dict = SyncRestServlet._rollback_state_for_timeline(
-            state_dict, timeline_events)
-
         state_events = state_dict.values()
 
         serialized_state = [serialize(e) for e in state_events]
@@ -314,77 +310,6 @@ class SyncRestServlet(RestServlet):
 
         return result
 
-    @staticmethod
-    def _rollback_state_for_timeline(state, timeline):
-        """
-        Wind the state dictionary backwards, so that it represents the
-        state at the start of the timeline, rather than at the end.
-
-        :param dict[(str, str), synapse.events.EventBase] state: the
-            state dictionary. Will be updated to the state before the timeline.
-        :param list[synapse.events.EventBase] timeline: the event timeline
-        :return: updated state dictionary
-        """
-
-        result = state.copy()
-
-        for timeline_event in reversed(timeline):
-            if not timeline_event.is_state():
-                continue
-
-            event_key = (timeline_event.type, timeline_event.state_key)
-
-            logger.debug("Considering %s for removal", event_key)
-
-            state_event = result.get(event_key)
-            if (state_event is None or
-                    state_event.event_id != timeline_event.event_id):
-                # the event in the timeline isn't present in the state
-                # dictionary.
-                #
-                # the most likely cause for this is that there was a fork in
-                # the event graph, and the state is no longer valid. Really,
-                # the event shouldn't be in the timeline. We're going to ignore
-                # it for now, however.
-                logger.debug("Found state event %r in timeline which doesn't "
-                             "match state dictionary", timeline_event)
-                continue
-
-            prev_event_id = timeline_event.unsigned.get("replaces_state", None)
-
-            prev_content = timeline_event.unsigned.get('prev_content')
-            prev_sender = timeline_event.unsigned.get('prev_sender')
-            # Empircally it seems possible for the event to have a
-            # "replaces_state" key but not a prev_content or prev_sender
-            # markjh conjectures that it could be due to the server not
-            # having a copy of that event.
-            # If this is the case the we ignore the previous event. This will
-            # cause the displayname calculations on the client to be incorrect
-            if prev_event_id is None or not prev_content or not prev_sender:
-                logger.debug(
-                    "Removing %r from the state dict, as it is missing"
-                    " prev_content (prev_event_id=%r)",
-                    timeline_event.event_id, prev_event_id
-                )
-                del result[event_key]
-            else:
-                logger.debug(
-                    "Replacing %r with %r in state dict",
-                    timeline_event.event_id, prev_event_id
-                )
-                result[event_key] = FrozenEvent({
-                    "type": timeline_event.type,
-                    "state_key": timeline_event.state_key,
-                    "content": prev_content,
-                    "sender": prev_sender,
-                    "event_id": prev_event_id,
-                    "room_id": timeline_event.room_id,
-                })
-
-            logger.debug("New value: %r", result.get(event_key))
-
-        return result
-
 
 def register_servlets(hs, http_server):
     SyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 349ef6b396..ca5468c402 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -26,9 +26,7 @@ class VersionsRestServlet(RestServlet):
 
     def on_GET(self, request):
         return (200, {
-            "versions": [
-                "r0.0.1",
-            ]
+            "versions": ["r0.0.1"]
         })
 
 
diff --git a/synapse/server.py b/synapse/server.py
index 5fee7fe130..368d615576 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -23,7 +23,7 @@ from twisted.web.client import BrowserLikePolicyForHTTPS
 from twisted.enterprise import adbapi
 
 from synapse.federation import initialize_http_replication
-from synapse.http.client import SimpleHttpClient,  InsecureInterceptableContextFactory
+from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.notifier import Notifier
 from synapse.api.auth import Auth
 from synapse.handlers import Handlers
diff --git a/synapse/state.py b/synapse/state.py
index 0acf309fe0..b9a1387520 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -63,7 +63,7 @@ class StateHandler(object):
             cache_name="state_cache",
             clock=self.clock,
             max_len=SIZE_OF_CACHE,
-            expiry_ms=EVICTION_TIMEOUT_SECONDS*1000,
+            expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
             reset_expiry_on_get=True,
         )
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index c91c7a3729..5a9e7720d9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -59,7 +59,7 @@ logger = logging.getLogger(__name__)
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
 # times give more inserts into the database even for readonly API hits
 # 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120*1000
+LAST_SEEN_GRANULARITY = 120 * 1000
 
 
 class DataStore(RoomMemberStore, RoomStore,
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 5e77320540..cfb87d9328 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -185,7 +185,7 @@ class SQLBaseStore(object):
             time_then = self._previous_loop_ts
             self._previous_loop_ts = time_now
 
-            ratio = (curr - prev)/(time_now - time_then)
+            ratio = (curr - prev) / (time_now - time_then)
 
             top_three_counters = self._txn_perf_counters.interval(
                 time_now - time_then, limit=3
@@ -643,7 +643,10 @@ class SQLBaseStore(object):
         if not iterable:
             defer.returnValue(results)
 
-        chunks = [iterable[i:i+batch_size] for i in xrange(0, len(iterable), batch_size)]
+        chunks = [
+            iterable[i:i + batch_size]
+            for i in xrange(0, len(iterable), batch_size)
+        ]
         for chunk in chunks:
             rows = yield self.runInteraction(
                 desc,
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 400c10103c..91fac33b8b 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -54,7 +54,7 @@ class Sqlite3Engine(object):
 
 def _parse_match_info(buf):
     bufsize = len(buf)
-    return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)]
+    return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)]
 
 
 def _rank(raw_match_info):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5f32eec6f8..ce2c794025 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -58,7 +58,7 @@ class EventFederationStore(SQLBaseStore):
             new_front = set()
             front_list = list(front)
             chunks = [
-                front_list[x:x+100]
+                front_list[x:x + 100]
                 for x in xrange(0, len(front), 100)
             ]
             for chunk in chunks:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5e85552029..4d7cdd00d0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -84,7 +84,7 @@ class EventsStore(SQLBaseStore):
                 event.internal_metadata.stream_ordering = stream
 
             chunks = [
-                events_and_contexts[x:x+100]
+                events_and_contexts[x:x + 100]
                 for x in xrange(0, len(events_and_contexts), 100)
             ]
 
@@ -740,7 +740,7 @@ class EventsStore(SQLBaseStore):
         rows = []
         N = 200
         for i in range(1 + len(events) / N):
-            evs = events[i*N:(i + 1)*N]
+            evs = events[i * N:(i + 1) * N]
             if not evs:
                 break
 
@@ -755,7 +755,7 @@ class EventsStore(SQLBaseStore):
                 " LEFT JOIN rejections as rej USING (event_id)"
                 " LEFT JOIN redactions as r ON e.event_id = r.redacts"
                 " WHERE e.event_id IN (%s)"
-            ) % (",".join(["?"]*len(evs)),)
+            ) % (",".join(["?"] * len(evs)),)
 
             txn.execute(sql, evs)
             rows.extend(self.cursor_to_dict(txn))
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a03458c2fc..2c49a5e499 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
             res = yield defer.gatherResults([
                 self.get_room_events_stream_for_room(
                     room_id, from_key, to_key, limit
@@ -220,28 +220,30 @@ class StreamStore(SQLBaseStore):
 
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-            ret.reverse()
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            if rows:
-                key = "s%d" % min(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = from_key
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-            return ret, key
-        res = yield self.runInteraction("get_room_events_stream_for_room", f)
-        defer.returnValue(res)
+        ret.reverse()
 
-    def get_room_changes_for_user(self, user_id, from_key, to_key):
+        if rows:
+            key = "s%d" % min(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = from_key
+
+        defer.returnValue((ret, key))
+
+    @defer.inlineCallbacks
+    def get_membership_changes_for_user(self, user_id, from_key, to_key):
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
         else:
@@ -249,14 +251,14 @@ class StreamStore(SQLBaseStore):
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
-            return defer.succeed([])
+            defer.returnValue([])
 
         if from_id:
             has_changed = self._membership_stream_cache.has_entity_changed(
                 user_id, int(from_id)
             )
             if not has_changed:
-                return defer.succeed([])
+                defer.returnValue([])
 
         def f(txn):
             if from_id is not None:
@@ -281,17 +283,18 @@ class StreamStore(SQLBaseStore):
                 txn.execute(sql, (user_id, to_id,))
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
+
+        rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            return ret
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-        return self.runInteraction("get_room_changes_for_user", f)
+        defer.returnValue(ret)
 
     def get_room_events_stream(
         self,
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index f1fe963adf..7566d9eb33 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -46,7 +46,7 @@ class Clock(object):
 
     def looping_call(self, f, msec):
         l = task.LoopingCall(f)
-        l.start(msec/1000.0, now=False)
+        l.start(msec / 1000.0, now=False)
         return l
 
     def stop_looping_call(self, loop):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 88e56e3302..e27917c63a 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -149,7 +149,7 @@ class CacheDescriptor(object):
         self.lru = lru
         self.tree = tree
 
-        self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
+        self.arg_names = inspect.getargspec(orig).args[1:num_args + 1]
 
         if len(self.arg_names) < self.num_args:
             raise Exception(
@@ -250,7 +250,7 @@ class CacheListDescriptor(object):
         self.num_args = num_args
         self.list_name = list_name
 
-        self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
+        self.arg_names = inspect.getargspec(orig).args[1:num_args + 1]
         self.list_pos = self.arg_names.index(self.list_name)
 
         self.cache = cache
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 494226f5ea..62cae99649 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -55,7 +55,7 @@ class ExpiringCache(object):
         def f():
             self._prune_cache()
 
-        self._clock.looping_call(f, self._expiry_ms/2)
+        self._clock.looping_call(f, self._expiry_ms / 2)
 
     def __setitem__(self, key, value):
         now = self._clock.time_msec()
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 29d02f7e95..03bc1401b7 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -58,7 +58,7 @@ class TreeCache(object):
 
             if n:
                 break
-            node_and_keys[i+1][0].pop(k)
+            node_and_keys[i + 1][0].pop(k)
 
         popped, cnt = _strip_and_count_entires(popped)
         self.size -= cnt
diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py
index d5b1a37eff..c37a157787 100644
--- a/synapse/util/logutils.py
+++ b/synapse/util/logutils.py
@@ -111,7 +111,7 @@ def time_function(f):
             _log_debug_as_f(
                 f,
                 "[FUNC END] {%s-%d} %f",
-                (func_name, id, end-start,),
+                (func_name, id, end - start,),
             )
 
         return r
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index c37d6f12e3..ea321bc6a9 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -163,7 +163,7 @@ class _PerHostRatelimiter(object):
                 "Ratelimit [%s]: sleeping req",
                 id(request_id),
             )
-            ret_defer = sleep(self.sleep_msec/1000.0)
+            ret_defer = sleep(self.sleep_msec / 1000.0)
 
             self.sleeping_requests.add(request_id)