summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/__init__.py1
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/errors.py4
-rw-r--r--synapse/app/__init__.py1
-rw-r--r--synapse/config/_base.py19
-rw-r--r--synapse/config/database.py2
-rw-r--r--synapse/config/email.py7
-rw-r--r--synapse/config/logger.py3
-rw-r--r--synapse/config/ratelimiting.py1
-rw-r--r--synapse/config/repository.py1
-rw-r--r--synapse/config/server.py2
-rw-r--r--synapse/config/tls.py2
-rw-r--r--synapse/config/voip.py5
-rw-r--r--synapse/crypto/__init__.py1
-rw-r--r--synapse/crypto/context_factory.py2
-rw-r--r--synapse/crypto/keyclient.py1
-rw-r--r--synapse/crypto/keyring.py4
-rw-r--r--synapse/handlers/login.py2
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/presence.py63
-rw-r--r--synapse/handlers/register.py15
-rw-r--r--synapse/handlers/room.py26
-rw-r--r--synapse/handlers/typing.py16
-rw-r--r--synapse/http/__init__.py1
-rw-r--r--synapse/http/client.py11
-rw-r--r--synapse/http/content_repository.py10
-rw-r--r--synapse/notifier.py5
-rw-r--r--synapse/rest/profile.py6
-rw-r--r--synapse/rest/register.py56
-rw-r--r--synapse/rest/room.py54
-rw-r--r--synapse/rest/transactions.py14
-rw-r--r--synapse/rest/voip.py14
-rw-r--r--synapse/storage/keys.py3
-rw-r--r--synapse/storage/roommember.py8
-rw-r--r--synapse/streams/config.py23
-rw-r--r--synapse/streams/events.py2
-rw-r--r--synapse/test_pyflakes.py1
-rw-r--r--synapse/util/distributor.py3
-rw-r--r--synapse/util/emailutils.py6
-rw-r--r--synapse/util/jsonobject.py3
40 files changed, 224 insertions, 186 deletions
diff --git a/synapse/api/__init__.py b/synapse/api/__init__.py
index 9bff9ec169..f9811bfa04 100644
--- a/synapse/api/__init__.py
+++ b/synapse/api/__init__.py
@@ -12,4 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 618d3d7577..3cafff0e32 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -58,4 +58,4 @@ class LoginType(object):
     EMAIL_CODE = u"m.login.email.code"
     EMAIL_URL = u"m.login.email.url"
     EMAIL_IDENTITY = u"m.login.email.identity"
-    RECAPTCHA = u"m.login.recaptcha"
\ No newline at end of file
+    RECAPTCHA = u"m.login.recaptcha"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 6d7d499fea..38ccb4f9d1 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -54,7 +54,7 @@ class SynapseError(CodeMessageException):
         """Constructs a synapse error.
 
         Args:
-            code (int): The integer error code (typically an HTTP response code)
+            code (int): The integer error code (an HTTP response code)
             msg (str): The human-readable error message.
             err (str): The error code e.g 'M_FORBIDDEN'
         """
@@ -67,6 +67,7 @@ class SynapseError(CodeMessageException):
             self.errcode,
         )
 
+
 class RoomError(SynapseError):
     """An error raised when a room event fails."""
     pass
@@ -117,6 +118,7 @@ class InvalidCaptchaError(SynapseError):
             error_url=self.error_url,
         )
 
+
 class LimitExceededError(SynapseError):
     """A client has sent too many requests and is being throttled.
     """
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 9bff9ec169..f9811bfa04 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -12,4 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index b3aeff327c..8ebd2eba4a 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -116,18 +116,25 @@ class Config(object):
             config = {}
             for key, value in vars(args).items():
                 if (key not in set(["config_path", "generate_config"])
-                    and value is not None):
+                        and value is not None):
                     config[key] = value
             with open(config_args.config_path, "w") as config_file:
                 # TODO(paul) it would be lovely if we wrote out vim- and emacs-
                 #   style mode markers into the file, to hint to people that
                 #   this is a YAML file.
                 yaml.dump(config, config_file, default_flow_style=False)
-            print "A config file has been generated in %s for server name '%s') with corresponding SSL keys and self-signed certificates. Please review this file and customise it to your needs." % (config_args.config_path, config['server_name'])
-            print "If this server name is incorrect, you will need to regenerate the SSL certificates"
+            print (
+                "A config file has been generated in %s for server name"
+                " '%s' with corresponding SSL keys and self-signed"
+                " certificates. Please review this file and customise it to"
+                " your needs."
+            ) % (
+                config_args.config_path, config['server_name']
+            )
+            print (
+                "If this server name is incorrect, you will need to regenerate"
+                " the SSL certificates"
+            )
             sys.exit(0)
 
         return cls(args)
-
-
-
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 460445f15d..0aac8c8382 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -16,6 +16,7 @@
 from ._base import Config
 import os
 
+
 class DatabaseConfig(Config):
     def __init__(self, args):
         super(DatabaseConfig, self).__init__(args)
@@ -34,4 +35,3 @@ class DatabaseConfig(Config):
     def generate_config(cls, args, config_dir_path):
         super(DatabaseConfig, cls).generate_config(args, config_dir_path)
         args.database_path = os.path.abspath(args.database_path)
-
diff --git a/synapse/config/email.py b/synapse/config/email.py
index 9bcc5a8fea..6bab133224 100644
--- a/synapse/config/email.py
+++ b/synapse/config/email.py
@@ -35,5 +35,8 @@ class EmailConfig(Config):
         email_group.add_argument(
             "--email-smtp-server",
             default="",
-            help="The SMTP server to send emails from (e.g. for password resets)."
-        )
\ No newline at end of file
+            help=(
+                "The SMTP server to send emails from (e.g. for password"
+                " resets)."
+            )
+        )
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 56cd095433..05611d02f7 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -19,6 +19,7 @@ from twisted.python.log import PythonLoggingObserver
 import logging
 import logging.config
 
+
 class LoggingConfig(Config):
     def __init__(self, args):
         super(LoggingConfig, self).__init__(args)
@@ -51,7 +52,7 @@ class LoggingConfig(Config):
 
             level = logging.INFO
             if self.verbosity:
-               level = logging.DEBUG
+                level = logging.DEBUG
 
                # FIXME: we need a logging.WARN for a -q quiet option
 
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index f126782b8d..fb63ed7d9b 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -14,6 +14,7 @@
 
 from ._base import Config
 
+
 class RatelimitConfig(Config):
 
     def __init__(self, args):
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index b71d30227c..743bc26474 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -15,6 +15,7 @@
 
 from ._base import Config
 
+
 class ContentRepositoryConfig(Config):
     def __init__(self, args):
         super(ContentRepositoryConfig, self).__init__(args)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 086937044f..3afda12d5a 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -34,7 +34,7 @@ class ServerConfig(Config):
         if not args.content_addr:
             host = args.server_name
             if ':' not in host:
-                host  = "%s:%d" % (host, args.bind_port)
+                host = "%s:%d" % (host, args.bind_port)
             args.content_addr = "https://%s" % (host,)
 
         self.content_addr = args.content_addr
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 72d5518a89..3600c3ea9e 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -19,7 +19,7 @@ from OpenSSL import crypto
 import subprocess
 import os
 
-GENERATE_DH_PARAMS=False
+GENERATE_DH_PARAMS = False
 
 
 class TlsConfig(Config):
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 3a51664f46..06675966ce 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -33,7 +33,10 @@ class VoipConfig(Config):
         )
         group.add_argument(
             "--turn-shared-secret", type=str, default=None,
-            help="The shared secret used to compute passwords for the TURN server"
+            help=(
+                "The shared secret used to compute passwords for the TURN"
+                " server"
+            )
         )
         group.add_argument(
             "--turn-user-lifetime", type=int, default=(1000 * 60 * 60),
diff --git a/synapse/crypto/__init__.py b/synapse/crypto/__init__.py
index 9bff9ec169..f9811bfa04 100644
--- a/synapse/crypto/__init__.py
+++ b/synapse/crypto/__init__.py
@@ -12,4 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index f402c795bb..3143322d9c 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -20,6 +20,7 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+
 class ServerContextFactory(ssl.ContextFactory):
     """Factory for PyOpenSSL SSL contexts that are used to handle incoming
     connections and to make connections to remote servers."""
@@ -43,4 +44,3 @@ class ServerContextFactory(ssl.ContextFactory):
 
     def getContext(self):
         return self._context
-
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index 7cfec5148e..5191be4570 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -98,4 +98,3 @@ class SynapseKeyClientProtocol(HTTPClient):
 
 class SynapseKeyClientFactory(Factory):
     protocol = SynapseKeyClientProtocol
-
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 2440d604c3..694aed3a7d 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -44,7 +44,7 @@ class Keyring(object):
             raise SynapseError(
                 400,
                 "Not signed with a supported algorithm",
-                 Codes.UNAUTHORIZED,
+                Codes.UNAUTHORIZED,
             )
         try:
             verify_key = yield self.get_server_verify_key(server_name, key_ids)
@@ -100,7 +100,7 @@ class Keyring(object):
         )
 
         if ("signatures" not in response
-            or server_name not in response["signatures"]):
+                or server_name not in response["signatures"]):
             raise ValueError("Key response not signed by remote server")
 
         if "tls_certificate" not in response:
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index 3f152e18f0..99d15261d4 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -54,7 +54,7 @@ class LoginHandler(BaseHandler):
         # pull out the hash for this user if they exist
         user_info = yield self.store.get_user_by_id(user_id=user)
         if not user_info:
-            logger.warn("Attempted to login as %s but they do not exist.", user)
+            logger.warn("Attempted to login as %s but they do not exist", user)
             raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
         stored_hash = user_info[0]["password_hash"]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 65861033e9..c6f6ab14d1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -114,8 +114,12 @@ class MessageHandler(BaseHandler):
 
         user = self.hs.parse_userid(user_id)
 
-        events, next_token = yield data_source.get_pagination_rows(
-            user, pagin_config, room_id
+        events, next_key = yield data_source.get_pagination_rows(
+            user, pagin_config.get_source_config("room"), room_id
+        )
+
+        next_token = pagin_config.from_token.copy_and_replace(
+            "room_key", next_key
         )
 
         chunk = {
@@ -264,7 +268,7 @@ class MessageHandler(BaseHandler):
         presence_stream = self.hs.get_event_sources().sources["presence"]
         pagination_config = PaginationConfig(from_token=now_token)
         presence, _ = yield presence_stream.get_pagination_rows(
-            user, pagination_config, None
+            user, pagination_config.get_source_config("presence"), None
         )
 
         public_rooms = yield self.store.get_rooms(is_public=True)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b2af09f090..2ccc2245b7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -76,9 +76,7 @@ class PresenceHandler(BaseHandler):
             "stopped_user_eventstream", self.stopped_user_eventstream
         )
 
-        distributor.observe("user_joined_room",
-            self.user_joined_room
-        )
+        distributor.observe("user_joined_room", self.user_joined_room)
 
         distributor.declare("collect_presencelike_data")
 
@@ -156,14 +154,12 @@ class PresenceHandler(BaseHandler):
             defer.returnValue(True)
 
         if (yield self.store.user_rooms_intersect(
-            [u.to_string() for u in observer_user, observed_user]
-        )):
+                [u.to_string() for u in observer_user, observed_user])):
             defer.returnValue(True)
 
         if (yield self.store.is_presence_visible(
-            observed_localpart=observed_user.localpart,
-            observer_userid=observer_user.to_string(),
-        )):
+                observed_localpart=observed_user.localpart,
+                observer_userid=observer_user.to_string())):
             defer.returnValue(True)
 
         defer.returnValue(False)
@@ -171,7 +167,8 @@ class PresenceHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_state(self, target_user, auth_user):
         if target_user.is_mine:
-            visible = yield self.is_presence_visible(observer_user=auth_user,
+            visible = yield self.is_presence_visible(
+                observer_user=auth_user,
                 observed_user=target_user
             )
 
@@ -219,9 +216,9 @@ class PresenceHandler(BaseHandler):
                 )
 
         if state["presence"] not in self.STATE_LEVELS:
-            raise SynapseError(400, "'%s' is not a valid presence state" %
-                state["presence"]
-            )
+            raise SynapseError(400, "'%s' is not a valid presence state" % (
+                state["presence"],
+            ))
 
         logger.debug("Updating presence state of %s to %s",
                      target_user.localpart, state["presence"])
@@ -229,7 +226,7 @@ class PresenceHandler(BaseHandler):
         state_to_store = dict(state)
         state_to_store["state"] = state_to_store.pop("presence")
 
-        statuscache=self._get_or_offline_usercache(target_user)
+        statuscache = self._get_or_offline_usercache(target_user)
         was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
         now_level = self.STATE_LEVELS[state["presence"]]
 
@@ -649,8 +646,9 @@ class PresenceHandler(BaseHandler):
             del state["user_id"]
 
             if "presence" not in state:
-                logger.warning("Received a presence 'push' EDU from %s without"
-                    + " a 'presence' key", origin
+                logger.warning(
+                    "Received a presence 'push' EDU from %s without a"
+                    " 'presence' key", origin
                 )
                 continue
 
@@ -745,7 +743,7 @@ class PresenceHandler(BaseHandler):
         defer.returnValue((localusers, remote_domains))
 
     def push_update_to_clients(self, observed_user, users_to_push=[],
-                                 room_ids=[], statuscache=None):
+                               room_ids=[], statuscache=None):
         self.notifier.on_new_user_event(
             users_to_push,
             room_ids,
@@ -765,8 +763,7 @@ class PresenceEventSource(object):
         presence = self.hs.get_handlers().presence_handler
 
         if (yield presence.store.user_rooms_intersect(
-            [u.to_string() for u in observer_user, observed_user]
-        )):
+                [u.to_string() for u in observer_user, observed_user])):
             defer.returnValue(True)
 
         if observed_user.is_mine:
@@ -823,15 +820,12 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         # TODO (erikj): Does this make sense? Ordering?
 
-        from_token = pagination_config.from_token
-        to_token = pagination_config.to_token
-
         observer_user = user
 
-        from_key = int(from_token.presence_key)
+        from_key = int(pagination_config.from_key)
 
-        if to_token:
-            to_key = int(to_token.presence_key)
+        if pagination_config.to_key:
+            to_key = int(pagination_config.to_key)
         else:
             to_key = -1
 
@@ -841,7 +835,7 @@ class PresenceEventSource(object):
         updates = []
         # TODO(paul): use a DeferredList ? How to limit concurrency.
         for observed_user in cachemap.keys():
-            if not (to_key < cachemap[observed_user].serial < from_key):
+            if not (to_key < cachemap[observed_user].serial <= from_key):
                 continue
 
             if (yield self.is_visible(observer_user, observed_user)):
@@ -849,30 +843,15 @@ class PresenceEventSource(object):
 
         # TODO(paul): limit
 
-        updates = [(k, cachemap[k]) for k in cachemap
-                   if to_key < cachemap[k].serial < from_key]
-
         if updates:
             clock = self.clock
 
             earliest_serial = max([x[1].serial for x in updates])
             data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
 
-            if to_token:
-                next_token = to_token
-            else:
-                next_token = from_token
-
-            next_token = next_token.copy_and_replace(
-                "presence_key", earliest_serial
-            )
-            defer.returnValue((data, next_token))
+            defer.returnValue((data, earliest_serial))
         else:
-            if not to_token:
-                to_token = from_token.copy_and_replace(
-                    "presence_key", 0
-                )
-            defer.returnValue(([], to_token))
+            defer.returnValue(([], 0))
 
 
 class UserPresenceCache(object):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 94b7890b5e..88eb51a8ed 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -64,9 +64,11 @@ class RegistrationHandler(BaseHandler):
             user_id = user.to_string()
 
             token = self._generate_token(user_id)
-            yield self.store.register(user_id=user_id,
+            yield self.store.register(
+                user_id=user_id,
                 token=token,
-                password_hash=password_hash)
+                password_hash=password_hash
+            )
 
             self.distributor.fire("registered_user", user)
         else:
@@ -181,8 +183,11 @@ class RegistrationHandler(BaseHandler):
         data = yield httpCli.post_urlencoded_get_json(
             creds['idServer'],
             "/_matrix/identity/api/v1/3pid/bind",
-            {'sid': creds['sid'], 'clientSecret': creds['clientSecret'],
-            'mxid': mxid}
+            {
+                'sid': creds['sid'],
+                'clientSecret': creds['clientSecret'],
+                'mxid': mxid,
+            }
         )
         defer.returnValue(data)
 
@@ -223,5 +228,3 @@ class RegistrationHandler(BaseHandler):
             }
         )
         defer.returnValue(data)
-
-
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cb5bd17d2b..ffc0892f1a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -170,11 +170,6 @@ class RoomCreationHandler(BaseHandler):
             content=content
         )
 
-        yield self.hs.get_handlers().room_member_handler.change_membership(
-            join_event,
-            do_auth=False
-        )
-
         content = {"membership": Membership.INVITE}
         for invitee in invite_list:
             invite_event = self.event_factory.create_event(
@@ -616,23 +611,14 @@ class RoomEventSource(object):
         return self.store.get_room_events_max_id()
 
     @defer.inlineCallbacks
-    def get_pagination_rows(self, user, pagination_config, key):
-        from_token = pagination_config.from_token
-        to_token = pagination_config.to_token
-        limit = pagination_config.limit
-        direction = pagination_config.direction
-
-        to_key = to_token.room_key if to_token else None
-
+    def get_pagination_rows(self, user, config, key):
         events, next_key = yield self.store.paginate_room_events(
             room_id=key,
-            from_key=from_token.room_key,
-            to_key=to_key,
-            direction=direction,
-            limit=limit,
+            from_key=config.from_key,
+            to_key=config.to_key,
+            direction=config.direction,
+            limit=config.limit,
             with_feedback=True
         )
 
-        next_token = from_token.copy_and_replace("room_key", next_key)
-
-        defer.returnValue((events, next_token))
+        defer.returnValue((events, next_key))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0ca4e5c31e..d88a53242c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -96,9 +96,10 @@ class TypingNotificationHandler(BaseHandler):
         remotedomains = set()
 
         rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(room_id,
-                localusers=localusers, remotedomains=remotedomains,
-                ignore_user=user)
+        yield rm_handler.fetch_room_distributions_into(
+            room_id, localusers=localusers, remotedomains=remotedomains,
+            ignore_user=user
+        )
 
         for u in localusers:
             self.push_update_to_clients(
@@ -130,8 +131,9 @@ class TypingNotificationHandler(BaseHandler):
         localusers = set()
 
         rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(room_id,
-                localusers=localusers)
+        yield rm_handler.fetch_room_distributions_into(
+            room_id, localusers=localusers
+        )
 
         for u in localusers:
             self.push_update_to_clients(
@@ -142,7 +144,7 @@ class TypingNotificationHandler(BaseHandler):
             )
 
     def push_update_to_clients(self, room_id, observer_user, observed_user,
-            typing):
+                               typing):
         # TODO(paul) steal this from presence.py
         pass
 
@@ -158,4 +160,4 @@ class TypingNotificationEventSource(object):
         return 0
 
     def get_pagination_rows(self, user, pagination_config, key):
-        return ([], pagination_config.from_token)
+        return ([], pagination_config.from_key)
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 9bff9ec169..f9811bfa04 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -12,4 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 46c90dbb76..c34b086eb9 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -16,7 +16,9 @@
 
 from twisted.internet import defer, reactor
 from twisted.internet.error import DNSLookupError
-from twisted.web.client import _AgentBase, _URI, readBody, FileBodyProducer, PartialDownloadError
+from twisted.web.client import (
+    _AgentBase, _URI, readBody, FileBodyProducer, PartialDownloadError
+)
 from twisted.web.http_headers import Headers
 
 from synapse.http.endpoint import matrix_endpoint
@@ -97,7 +99,7 @@ class BaseHttpClient(object):
 
         retries_left = 5
 
-        endpoint = self._getEndpoint(reactor, destination);
+        endpoint = self._getEndpoint(reactor, destination)
 
         while True:
 
@@ -181,7 +183,7 @@ class MatrixHttpClient(BaseHttpClient):
 
         auth_headers = []
 
-        for key,sig in request["signatures"][self.server_name].items():
+        for key, sig in request["signatures"][self.server_name].items():
             auth_headers.append(bytes(
                 "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
                     self.server_name, key, sig,
@@ -276,7 +278,6 @@ class MatrixHttpClient(BaseHttpClient):
 
         defer.returnValue(json.loads(body))
 
-
     def _getEndpoint(self, reactor, destination):
         return matrix_endpoint(
             reactor, destination, timeout=10,
@@ -351,6 +352,7 @@ class IdentityServerHttpClient(BaseHttpClient):
 
         defer.returnValue(json.loads(body))
 
+
 class CaptchaServerHttpClient(MatrixHttpClient):
     """Separate HTTP client for talking to google's captcha servers"""
 
@@ -384,6 +386,7 @@ class CaptchaServerHttpClient(MatrixHttpClient):
             else:
                 raise e
 
+
 def _print_ex(e):
     if hasattr(e, "reasons") and e.reasons:
         for ex in e.reasons:
diff --git a/synapse/http/content_repository.py b/synapse/http/content_repository.py
index 7dd4a859f8..3159ffff0a 100644
--- a/synapse/http/content_repository.py
+++ b/synapse/http/content_repository.py
@@ -38,8 +38,8 @@ class ContentRepoResource(resource.Resource):
 
     Uploads are POSTed to wherever this Resource is linked to. This resource
     returns a "content token" which can be used to GET this content again. The
-    token is typically a path, but it may not be. Tokens can expire, be one-time
-    uses, etc.
+    token is typically a path, but it may not be. Tokens can expire, be
+    one-time uses, etc.
 
     In this case, the token is a path to the file and contains 3 interesting
     sections:
@@ -175,10 +175,9 @@ class ContentRepoResource(resource.Resource):
             with open(fname, "wb") as f:
                 f.write(request.content.read())
 
-
             # FIXME (erikj): These should use constants.
             file_name = os.path.basename(fname)
-            # FIXME: we can't assume what the public mounted path of the repo is
+            # FIXME: we can't assume what the repo's public mounted path is
             # ...plus self-signed SSL won't work to remote clients anyway
             # ...and we can't assume that it's SSL anyway, as we might want to
             # server it via the non-SSL listener...
@@ -201,6 +200,3 @@ class ContentRepoResource(resource.Resource):
                 500,
                 json.dumps({"error": "Internal server error"}),
                 send_cors=True)
-
-
-
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 5b02c71d1e..f38c410e33 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -167,7 +167,8 @@ class Notifier(object):
                 )
 
         def eb(failure):
-            logger.error("Failed to notify listener",
+            logger.error(
+                "Failed to notify listener",
                 exc_info=(
                     failure.type,
                     failure.value,
@@ -207,7 +208,7 @@ class Notifier(object):
         )
 
         if timeout:
-            reactor.callLater(timeout/1000, self._timeout_listener, listener)
+            reactor.callLater(timeout/1000.0, self._timeout_listener, listener)
 
             self._register_with_keys(listener)
 
diff --git a/synapse/rest/profile.py b/synapse/rest/profile.py
index dad5a208c7..72e02d8dd8 100644
--- a/synapse/rest/profile.py
+++ b/synapse/rest/profile.py
@@ -108,9 +108,9 @@ class ProfileRestServlet(RestServlet):
         )
 
         defer.returnValue((200, {
-                                    "displayname": displayname,
-                                    "avatar_url": avatar_url
-                                }))
+            "displayname": displayname,
+            "avatar_url": avatar_url
+        }))
 
 
 def register_servlets(hs, http_server):
diff --git a/synapse/rest/register.py b/synapse/rest/register.py
index 804117ee09..5c15614ea9 100644
--- a/synapse/rest/register.py
+++ b/synapse/rest/register.py
@@ -60,40 +60,45 @@ class RegisterRestServlet(RestServlet):
 
     def on_GET(self, request):
         if self.hs.config.enable_registration_captcha:
-            return (200, {
-                "flows": [
+            return (
+                200,
+                {"flows": [
                     {
                         "type": LoginType.RECAPTCHA,
-                        "stages": ([LoginType.RECAPTCHA,
-                                    LoginType.EMAIL_IDENTITY,
-                                    LoginType.PASSWORD])
+                        "stages": [
+                            LoginType.RECAPTCHA,
+                            LoginType.EMAIL_IDENTITY,
+                            LoginType.PASSWORD
+                        ]
                     },
                     {
                         "type": LoginType.RECAPTCHA,
                         "stages": [LoginType.RECAPTCHA, LoginType.PASSWORD]
                     }
-                ]
-            })
+                ]}
+            )
         else:
-            return (200, {
-                "flows": [
+            return (
+                200,
+                {"flows": [
                     {
                         "type": LoginType.EMAIL_IDENTITY,
-                        "stages": ([LoginType.EMAIL_IDENTITY,
-                                    LoginType.PASSWORD])
+                        "stages": [
+                            LoginType.EMAIL_IDENTITY, LoginType.PASSWORD
+                        ]
                     },
                     {
                         "type": LoginType.PASSWORD
                     }
-                ]
-            })
+                ]}
+            )
 
     @defer.inlineCallbacks
     def on_POST(self, request):
         register_json = _parse_json(request)
 
-        session = (register_json["session"] if "session" in register_json
-                  else None)
+        session = (register_json["session"]
+                   if "session" in register_json else None)
         login_type = None
         if "type" not in register_json:
             raise SynapseError(400, "Missing 'type' key.")
@@ -122,7 +127,9 @@ class RegisterRestServlet(RestServlet):
             defer.returnValue((200, response))
         except KeyError as e:
             logger.exception(e)
-            raise SynapseError(400, "Missing JSON keys for login type %s." % login_type)
+            raise SynapseError(400, "Missing JSON keys for login type %s." % (
+                login_type,
+            ))
 
     def on_OPTIONS(self, request):
         return (200, {})
@@ -183,8 +190,10 @@ class RegisterRestServlet(RestServlet):
                 session["user"] = register_json["user"]
                 defer.returnValue(None)
             else:
-                raise SynapseError(400, "Captcha bypass HMAC incorrect",
-                    errcode=Codes.CAPTCHA_NEEDED)
+                raise SynapseError(
+                    400, "Captcha bypass HMAC incorrect",
+                    errcode=Codes.CAPTCHA_NEEDED
+                )
 
         challenge = None
         user_response = None
@@ -230,12 +239,15 @@ class RegisterRestServlet(RestServlet):
 
         if ("user" in session and "user" in register_json and
                 session["user"] != register_json["user"]):
-            raise SynapseError(400, "Cannot change user ID during registration")
+            raise SynapseError(
+                400, "Cannot change user ID during registration"
+            )
 
         password = register_json["password"].encode("utf-8")
-        desired_user_id = (register_json["user"].encode("utf-8") if "user"
-                          in register_json else None)
-        if desired_user_id and urllib.quote(desired_user_id) != desired_user_id:
+        desired_user_id = (register_json["user"].encode("utf-8")
+                           if "user" in register_json else None)
+        if (desired_user_id
+                and urllib.quote(desired_user_id) != desired_user_id):
             raise SynapseError(
                 400,
                 "User ID must only contain characters which do not " +
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index c72bdc2c34..ec0ce78fda 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -48,7 +48,9 @@ class RoomCreateRestServlet(RestServlet):
     @defer.inlineCallbacks
     def on_PUT(self, request, txn_id):
         try:
-            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+            defer.returnValue(
+                self.txns.get_client_transaction(request, txn_id)
+            )
         except KeyError:
             pass
 
@@ -98,8 +100,8 @@ class RoomStateEventRestServlet(RestServlet):
         no_state_key = "/rooms/(?P<room_id>[^/]*)/state/(?P<event_type>[^/]*)$"
 
         # /room/$roomid/state/$eventtype/$statekey
-        state_key = ("/rooms/(?P<room_id>[^/]*)/state/" +
-            "(?P<event_type>[^/]*)/(?P<state_key>[^/]*)$")
+        state_key = ("/rooms/(?P<room_id>[^/]*)/state/"
+                     "(?P<event_type>[^/]*)/(?P<state_key>[^/]*)$")
 
         http_server.register_path("GET",
                                   client_path_pattern(state_key),
@@ -133,7 +135,9 @@ class RoomStateEventRestServlet(RestServlet):
         )
 
         if not data:
-            raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
+            raise SynapseError(
+                404, "Event not found.", errcode=Codes.NOT_FOUND
+            )
         defer.returnValue((200, data[0].get_dict()["content"]))
 
     @defer.inlineCallbacks
@@ -195,7 +199,9 @@ class RoomSendEventRestServlet(RestServlet):
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id, event_type, txn_id):
         try:
-            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+            defer.returnValue(
+                self.txns.get_client_transaction(request, txn_id)
+            )
         except KeyError:
             pass
 
@@ -254,7 +260,9 @@ class JoinRoomAliasServlet(RestServlet):
     @defer.inlineCallbacks
     def on_PUT(self, request, room_identifier, txn_id):
         try:
-            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+            defer.returnValue(
+                self.txns.get_client_transaction(request, txn_id)
+            )
         except KeyError:
             pass
 
@@ -293,7 +301,8 @@ class RoomMemberListRestServlet(RestServlet):
             target_user = self.hs.parse_userid(event["user_id"])
             # Presence is an optional cache; don't fail if we can't fetch it
             try:
-                presence_state = yield self.handlers.presence_handler.get_state(
+                presence_handler = self.handlers.presence_handler
+                presence_state = yield presence_handler.get_state(
                     target_user=target_user, auth_user=user
                 )
                 event["content"].update(presence_state)
@@ -359,11 +368,11 @@ class RoomInitialSyncRestServlet(RestServlet):
         #       { state event } , { state event }
         #   ]
         # }
-        # Probably worth keeping the keys room_id and membership for parity with
-        # /initialSync even though they must be joined to sync this and know the
-        # room ID, so clients can reuse the same code (room_id and membership
-        # are MANDATORY for /initialSync, so the code will expect it to be
-        # there)
+        # Probably worth keeping the keys room_id and membership for parity
+        # with /initialSync even though they must be joined to sync this and
+        # know the room ID, so clients can reuse the same code (room_id and
+        # membership are MANDATORY for /initialSync, so the code will expect
+        # it to be there)
         defer.returnValue((200, {}))
 
 
@@ -388,8 +397,8 @@ class RoomMembershipRestServlet(RestServlet):
 
     def register(self, http_server):
         # /rooms/$roomid/[invite|join|leave]
-        PATTERN = ("/rooms/(?P<room_id>[^/]*)/" +
-            "(?P<membership_action>join|invite|leave|ban|kick)")
+        PATTERN = ("/rooms/(?P<room_id>[^/]*)/"
+                   "(?P<membership_action>join|invite|leave|ban|kick)")
         register_txn_path(self, PATTERN, http_server)
 
     @defer.inlineCallbacks
@@ -422,7 +431,9 @@ class RoomMembershipRestServlet(RestServlet):
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id, membership_action, txn_id):
         try:
-            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+            defer.returnValue(
+                self.txns.get_client_transaction(request, txn_id)
+            )
         except KeyError:
             pass
 
@@ -431,6 +442,7 @@ class RoomMembershipRestServlet(RestServlet):
         self.txns.store_client_transaction(request, txn_id, response)
         defer.returnValue(response)
 
+
 class RoomRedactEventRestServlet(RestServlet):
     def register(self, http_server):
         PATTERN = ("/rooms/(?P<room_id>[^/]*)/redact/(?P<event_id>[^/]*)")
@@ -457,7 +469,9 @@ class RoomRedactEventRestServlet(RestServlet):
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id, event_id, txn_id):
         try:
-            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+            defer.returnValue(
+                self.txns.get_client_transaction(request, txn_id)
+            )
         except KeyError:
             pass
 
@@ -503,10 +517,10 @@ def register_txn_path(servlet, regex_string, http_server, with_get=False):
     )
     if with_get:
         http_server.register_path(
-        "GET",
-        client_path_pattern(regex_string + "/(?P<txn_id>[^/]*)$"),
-        servlet.on_GET
-    )
+            "GET",
+            client_path_pattern(regex_string + "/(?P<txn_id>[^/]*)$"),
+            servlet.on_GET
+        )
 
 
 def register_servlets(hs, http_server):
diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py
index e06dcc8c57..93c0122f30 100644
--- a/synapse/rest/transactions.py
+++ b/synapse/rest/transactions.py
@@ -30,9 +30,9 @@ class HttpTransactionStore(object):
         """Retrieve a response for this request.
 
         Args:
-            key (str): A transaction-independent key for this request. Typically
-            this is a combination of the path (without the transaction id) and
-            the user's access token.
+            key (str): A transaction-independent key for this request. Usually
+                this is a combination of the path (without the transaction id)
+                and the user's access token.
             txn_id (str): The transaction ID for this request
         Returns:
             A tuple of (HTTP response code, response content) or None.
@@ -51,9 +51,9 @@ class HttpTransactionStore(object):
         """Stores an HTTP response tuple.
 
         Args:
-            key (str): A transaction-independent key for this request. Typically
-            this is a combination of the path (without the transaction id) and
-            the user's access token.
+            key (str): A transaction-independent key for this request. Usually
+                this is a combination of the path (without the transaction id)
+                and the user's access token.
             txn_id (str): The transaction ID for this request.
             response (tuple): A tuple of (HTTP response code, response content)
         """
@@ -92,5 +92,3 @@ class HttpTransactionStore(object):
         token = request.args["access_token"][0]
         path_without_txn_id = request.path.rsplit("/", 1)[0]
         return path_without_txn_id + "/" + token
-
-
diff --git a/synapse/rest/voip.py b/synapse/rest/voip.py
index 0d0243a249..432c2475f8 100644
--- a/synapse/rest/voip.py
+++ b/synapse/rest/voip.py
@@ -34,23 +34,23 @@ class VoipRestServlet(RestServlet):
         turnSecret = self.hs.config.turn_shared_secret
         userLifetime = self.hs.config.turn_user_lifetime
         if not turnUris or not turnSecret or not userLifetime:
-            defer.returnValue( (200, {}) )
+            defer.returnValue((200, {}))
 
         expiry = (self.hs.get_clock().time_msec() + userLifetime) / 1000
         username = "%d:%s" % (expiry, auth_user.to_string())
-         
+
         mac = hmac.new(turnSecret, msg=username, digestmod=hashlib.sha1)
-        # We need to use standard base64 encoding here, *not* syutil's encode_base64
-        # because we need to add the standard padding to get the same result as the
-        # TURN server.
+        # We need to use standard base64 encoding here, *not* syutil's
+        # encode_base64 because we need to add the standard padding to get the
+        # same result as the TURN server.
         password = base64.b64encode(mac.digest())
 
-        defer.returnValue( (200, {
+        defer.returnValue((200, {
             'username': username,
             'password': password,
             'ttl': userLifetime / 1000,
             'uris': turnUris,
-        }) )
+        }))
 
     def on_OPTIONS(self, request):
         return (200, {})
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 4feb8335ba..fd705138e6 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -18,9 +18,10 @@ from _base import SQLBaseStore
 from twisted.internet import defer
 
 import OpenSSL
-from  syutil.crypto.signing_key import decode_verify_key_bytes
+from syutil.crypto.signing_key import decode_verify_key_bytes
 import hashlib
 
+
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys and tls X.509 certificates
     """
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ceeef5880e..93329703a2 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -33,7 +33,9 @@ class RoomMemberStore(SQLBaseStore):
             target_user_id = event.state_key
             domain = self.hs.parse_userid(target_user_id).domain
         except:
-            logger.exception("Failed to parse target_user_id=%s", target_user_id)
+            logger.exception(
+                "Failed to parse target_user_id=%s", target_user_id
+            )
             raise
 
         logger.debug(
@@ -65,7 +67,8 @@ class RoomMemberStore(SQLBaseStore):
             # Check if this was the last person to have left.
             member_events = self._get_members_query_txn(
                 txn,
-                where_clause="c.room_id = ? AND m.membership = ? AND m.user_id != ?",
+                where_clause=("c.room_id = ? AND m.membership = ?"
+                              " AND m.user_id != ?"),
                 where_values=(event.room_id, Membership.JOIN, target_user_id,)
             )
 
@@ -120,7 +123,6 @@ class RoomMemberStore(SQLBaseStore):
         else:
             return None
 
-
     def get_room_members(self, room_id, membership=None):
         """Retrieve the current room member list for a room.
 
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 6483ce2e25..527507e5cd 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -22,6 +22,19 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+class SourcePaginationConfig(object):
+
+    """A configuration object which stores pagination parameters for a
+    specific event source."""
+
+    def __init__(self, from_key=None, to_key=None, direction='f',
+                 limit=0):
+        self.from_key = from_key
+        self.to_key = to_key
+        self.direction = 'f' if direction == 'f' else 'b'
+        self.limit = int(limit)
+
+
 class PaginationConfig(object):
 
     """A configuration object which stores pagination parameters."""
@@ -82,3 +95,13 @@ class PaginationConfig(object):
             "<PaginationConfig from_tok=%s, to_tok=%s, "
             "direction=%s, limit=%s>"
         ) % (self.from_token, self.to_token, self.direction, self.limit)
+
+    def get_source_config(self, source_name):
+        keyname = "%s_key" % source_name
+
+        return SourcePaginationConfig(
+            from_key=getattr(self.from_token, keyname),
+            to_key=getattr(self.to_token, keyname) if self.to_token else None,
+            direction=self.direction,
+            limit=self.limit,
+        )
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 41715436b0..fb698d2d71 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -35,7 +35,7 @@ class NullSource(object):
         return defer.succeed(0)
 
     def get_pagination_rows(self, user, pagination_config, key):
-        return defer.succeed(([], pagination_config.from_token))
+        return defer.succeed(([], pagination_config.from_key))
 
 
 class EventSources(object):
diff --git a/synapse/test_pyflakes.py b/synapse/test_pyflakes.py
deleted file mode 100644
index 7b5b1a0858..0000000000
--- a/synapse/test_pyflakes.py
+++ /dev/null
@@ -1 +0,0 @@
-import an_unused_module
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 1de50e049f..eddbe5837f 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -42,7 +42,8 @@ class Distributor(object):
         if name in self.signals:
             raise KeyError("%r already has a signal named %s" % (self, name))
 
-        self.signals[name] = Signal(name,
+        self.signals[name] = Signal(
+            name,
             suppress_failures=self.suppress_failures,
         )
 
diff --git a/synapse/util/emailutils.py b/synapse/util/emailutils.py
index cdb0abd7ea..7038cab6c2 100644
--- a/synapse/util/emailutils.py
+++ b/synapse/util/emailutils.py
@@ -42,8 +42,8 @@ def send_email(smtp_server, from_addr, to_addr, subject, body):
         EmailException if there was a problem sending the mail.
     """
     if not smtp_server or not from_addr or not to_addr:
-        raise EmailException("Need SMTP server, from and to addresses. Check " +
-                             "the config to set these.")
+        raise EmailException("Need SMTP server, from and to addresses. Check"
+                             " the config to set these.")
 
     msg = MIMEMultipart('alternative')
     msg['Subject'] = subject
@@ -68,4 +68,4 @@ def send_email(smtp_server, from_addr, to_addr, subject, body):
         twisted.python.log.err()
         ese = EmailException()
         ese.cause = origException
-        raise ese
\ No newline at end of file
+        raise ese
diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py
index 6c99705747..c91eb897a8 100644
--- a/synapse/util/jsonobject.py
+++ b/synapse/util/jsonobject.py
@@ -13,9 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import copy
 
+
 class JsonEncodedObject(object):
     """ A common base class for defining protocol units that are represented
     as JSON.
@@ -89,6 +89,7 @@ class JsonEncodedObject(object):
     def __str__(self):
         return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__))
 
+
 def _encode(obj):
     if type(obj) is list:
         return [_encode(o) for o in obj]