summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-02-09 15:20:56 +0000
committerKegan Dougal <kegan@matrix.org>2015-02-09 15:20:56 +0000
commit53557fc53259631d71055ef2ab8a6916b00d229b (patch)
tree208db19fb5bbcc5416e669bc496adc49acdcb1a7 /synapse
parentFix bugs so lazy room joining works as intended. (diff)
parentMerge pull request #55 from matrix-org/profiling (diff)
downloadsynapse-53557fc53259631d71055ef2ab8a6916b00d229b.tar.xz
Merge branch 'develop' into application-services
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py4
-rw-r--r--synapse/handlers/federation.py146
-rw-r--r--synapse/handlers/register.py17
-rw-r--r--synapse/handlers/sync.py4
-rw-r--r--synapse/http/matrixfederationclient.py3
-rw-r--r--synapse/http/server.py23
-rw-r--r--synapse/push/__init__.py32
-rw-r--r--synapse/push/baserules.py12
-rw-r--r--synapse/rest/client/v1/__init__.py2
-rw-r--r--synapse/rest/client/v2_alpha/__init__.py2
-rw-r--r--synapse/rest/media/v1/base_resource.py2
-rw-r--r--synapse/rest/media/v1/media_repository.py1
-rw-r--r--synapse/rest/media/v1/upload_resource.py57
-rw-r--r--synapse/storage/_base.py25
-rw-r--r--synapse/storage/schema/delta/v12.sql11
-rw-r--r--synapse/storage/schema/delta/v13.sql24
-rw-r--r--synapse/util/__init__.py10
17 files changed, 238 insertions, 137 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index a9397de5b2..343ecea0fd 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -68,7 +68,7 @@ class SynapseHomeServer(HomeServer):
         return ClientV2AlphaRestResource(self)
 
     def build_resource_for_federation(self):
-        return JsonResource()
+        return JsonResource(self)
 
     def build_resource_for_app_services(self):
         return AppServiceRestResource(self)
@@ -279,6 +279,8 @@ def setup():
 
     hs.get_pusherpool().start()
 
+    hs.get_datastore().start_profiling()
+
     if config.daemonize:
         print config.pid_file
         daemon = Daemonize(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index aba266c2bc..b13b7c7701 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -130,7 +130,9 @@ class FederationHandler(BaseHandler):
         if auth_chain:
             event_ids |= {e.event_id for e in auth_chain}
 
-        seen_ids = (yield self.store.have_events(event_ids)).keys()
+        seen_ids = set(
+            (yield self.store.have_events(event_ids)).keys()
+        )
 
         if state and auth_chain is not None:
             # If we have any state or auth_chain given to us by the replication
@@ -332,8 +334,11 @@ class FederationHandler(BaseHandler):
 
             # Try the host we successfully got a response to /make_join/
             # request first.
-            target_hosts.remove(origin)
-            target_hosts.insert(0, origin)
+            try:
+                target_hosts.remove(origin)
+                target_hosts.insert(0, origin)
+            except ValueError:
+                pass
 
             ret = yield self.replication_layer.send_join(
                 target_hosts,
@@ -521,7 +526,7 @@ class FederationHandler(BaseHandler):
                     "Failed to get destination from event %s", s.event_id
                 )
 
-        destinations.remove(origin)
+        destinations.discard(origin)
 
         logger.debug(
             "on_send_join_request: Sending event: %s, signatures: %s",
@@ -786,12 +791,12 @@ class FederationHandler(BaseHandler):
     @log_function
     def do_auth(self, origin, event, context, auth_events):
         # Check if we have all the auth events.
-        res = yield self.store.have_events(
+        have_events = yield self.store.have_events(
             [e_id for e_id, _ in event.auth_events]
         )
 
         event_auth_events = set(e_id for e_id, _ in event.auth_events)
-        seen_events = set(res.keys())
+        seen_events = set(have_events.keys())
 
         missing_auth = event_auth_events - seen_events
 
@@ -834,6 +839,11 @@ class FederationHandler(BaseHandler):
                             auth_events[(e.type, e.state_key)] = e
                     except AuthError:
                         pass
+
+                have_events = yield self.store.have_events(
+                    [e_id for e_id, _ in event.auth_events]
+                )
+                seen_events = set(have_events.keys())
             except:
                 # FIXME:
                 logger.exception("Failed to get auth chain")
@@ -847,64 +857,75 @@ class FederationHandler(BaseHandler):
             # Do auth conflict res.
             logger.debug("Different auth: %s", different_auth)
 
-            # 1. Get what we think is the auth chain.
-            auth_ids = self.auth.compute_auth_events(
-                event, context.current_state
-            )
-            local_auth_chain = yield self.store.get_auth_chain(auth_ids)
-
-            try:
-                # 2. Get remote difference.
-                result = yield self.replication_layer.query_auth(
-                    origin,
-                    event.room_id,
-                    event.event_id,
-                    local_auth_chain,
+            # Only do auth resolution if we have something new to say.
+            # We can't rove an auth failure.
+            do_resolution = False
+            for e_id in different_auth:
+                if e_id in have_events:
+                    if have_events[e_id] != RejectedReason.AUTH_ERROR:
+                        do_resolution = True
+                        break
+
+            if do_resolution:
+                # 1. Get what we think is the auth chain.
+                auth_ids = self.auth.compute_auth_events(
+                    event, context.current_state
                 )
+                local_auth_chain = yield self.store.get_auth_chain(auth_ids)
 
-                seen_remotes = yield self.store.have_events(
-                    [e.event_id for e in result["auth_chain"]]
-                )
-
-                # 3. Process any remote auth chain events we haven't seen.
-                for ev in result["auth_chain"]:
-                    if ev.event_id in seen_remotes.keys():
-                        continue
-
-                    if ev.event_id == event.event_id:
-                        continue
-
-                    try:
-                        auth_ids = [e_id for e_id, _ in ev.auth_events]
-                        auth = {
-                            (e.type, e.state_key): e for e in result["auth_chain"]
-                            if e.event_id in auth_ids
-                        }
-                        ev.internal_metadata.outlier = True
-
-                        logger.debug(
-                            "do_auth %s different_auth: %s",
-                            event.event_id, e.event_id
-                        )
+                try:
+                    # 2. Get remote difference.
+                    result = yield self.replication_layer.query_auth(
+                        origin,
+                        event.room_id,
+                        event.event_id,
+                        local_auth_chain,
+                    )
 
-                        yield self._handle_new_event(
-                            origin, ev, auth_events=auth
-                        )
+                    seen_remotes = yield self.store.have_events(
+                        [e.event_id for e in result["auth_chain"]]
+                    )
 
-                        if ev.event_id in event_auth_events:
-                            auth_events[(ev.type, ev.state_key)] = ev
-                    except AuthError:
-                        pass
+                    # 3. Process any remote auth chain events we haven't seen.
+                    for ev in result["auth_chain"]:
+                        if ev.event_id in seen_remotes.keys():
+                            continue
+
+                        if ev.event_id == event.event_id:
+                            continue
+
+                        try:
+                            auth_ids = [e_id for e_id, _ in ev.auth_events]
+                            auth = {
+                                (e.type, e.state_key): e
+                                for e in result["auth_chain"]
+                                if e.event_id in auth_ids
+                            }
+                            ev.internal_metadata.outlier = True
+
+                            logger.debug(
+                                "do_auth %s different_auth: %s",
+                                event.event_id, e.event_id
+                            )
+
+                            yield self._handle_new_event(
+                                origin, ev, auth_events=auth
+                            )
+
+                            if ev.event_id in event_auth_events:
+                                auth_events[(ev.type, ev.state_key)] = ev
+                        except AuthError:
+                            pass
 
-            except:
-                # FIXME:
-                logger.exception("Failed to query auth chain")
+                except:
+                    # FIXME:
+                    logger.exception("Failed to query auth chain")
 
-            # 4. Look at rejects and their proofs.
-            # TODO.
+                # 4. Look at rejects and their proofs.
+                # TODO.
 
-            context.current_state.update(auth_events)
-            context.state_group = None
+                context.current_state.update(auth_events)
+                context.state_group = None
 
         try:
             self.auth.check(event, auth_events=auth_events)
@@ -1013,16 +1034,19 @@ class FederationHandler(BaseHandler):
         for e in missing_remotes:
             for e_id, _ in e.auth_events:
                 if e_id in missing_remote_ids:
-                    base_remote_rejected.remove(e)
+                    try:
+                        base_remote_rejected.remove(e)
+                    except ValueError:
+                        pass
 
         reason_map = {}
 
         for e in base_remote_rejected:
             reason = yield self.store.get_rejection_reason(e.event_id)
             if reason is None:
-                # FIXME: ERRR?!
-                logger.warn("Could not find reason for %s", e.event_id)
-                raise RuntimeError("Could not find reason for %s" % e.event_id)
+                # TODO: e is not in the current state, so we should
+                # construct some proof of that.
+                continue
 
             reason_map[e.event_id] = reason
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 10ab9bc99e..7ed0ce6299 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -104,6 +104,23 @@ class RegistrationHandler(BaseHandler):
                         raise RegistrationError(
                             500, "Cannot generate user ID.")
 
+        # create a default avatar for the user
+        # XXX: ideally clients would explicitly specify one, but given they don't
+        # and we want consistent and pretty identicons for random users, we'll
+        # do it here.
+        try:
+            auth_user = UserID.from_string(user_id)
+            identicon_resource = self.hs.get_resource_for_media_repository().getChildWithDefault("identicon", None)
+            upload_resource = self.hs.get_resource_for_media_repository().getChildWithDefault("upload", None)
+            identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320)
+            content_uri = yield upload_resource.create_content(
+                "image/png", None, identicon_bytes, len(identicon_bytes), auth_user
+            )
+            profile_handler = self.hs.get_handlers().profile_handler
+            profile_handler.set_avatar_url(auth_user, auth_user, ("%s#auto" % content_uri))
+        except NotImplementedError:
+            pass # make tests pass without messing around creating default avatars
+        
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 962686f4bb..439164ae39 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -298,15 +298,17 @@ class SyncHandler(BaseHandler):
         load_limit = max(sync_config.limit * filtering_factor, 100)
         max_repeat = 3  # Only try a few times per room, otherwise
         room_key = now_token.room_key
+        end_key = room_key
 
         while limited and len(recents) < sync_config.limit and max_repeat:
             events, keys = yield self.store.get_recent_events_for_room(
                 room_id,
                 limit=load_limit + 1,
                 from_token=since_token.room_key if since_token else None,
-                end_token=room_key,
+                end_token=end_key,
             )
             (room_key, _) = keys
+            end_key = "s" + room_key.split('-')[-1]
             loaded_recents = sync_config.filter.filter_room_events(events)
             loaded_recents.extend(recents)
             recents = loaded_recents
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 8559d06b7f..056d446e42 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -169,8 +169,9 @@ class MatrixFederationHttpClient(object):
         else:
             # :'(
             # Update transactions table?
+            body = yield readBody(response)
             raise HttpResponseException(
-                response.code, response.phrase, response
+                response.code, response.phrase, body
             )
 
         defer.returnValue(response)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 0f6539e1be..6d084fa33c 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -69,9 +69,10 @@ class JsonResource(HttpServer, resource.Resource):
 
     _PathEntry = collections.namedtuple("_PathEntry", ["pattern", "callback"])
 
-    def __init__(self):
+    def __init__(self, hs):
         resource.Resource.__init__(self)
 
+        self.clock = hs.get_clock()
         self.path_regexs = {}
 
     def register_path(self, method, path_pattern, callback):
@@ -111,6 +112,7 @@ class JsonResource(HttpServer, resource.Resource):
             This checks if anyone has registered a callback for that method and
             path.
         """
+        code = None
         try:
             # Just say yes to OPTIONS.
             if request.method == "OPTIONS":
@@ -130,6 +132,13 @@ class JsonResource(HttpServer, resource.Resource):
                         urllib.unquote(u).decode("UTF-8") for u in m.groups()
                     ]
 
+                    logger.info(
+                        "Received request: %s %s",
+                        request.method, request.path
+                    )
+
+                    start = self.clock.time_msec()
+
                     code, response = yield path_entry.callback(
                         request,
                         *args
@@ -145,9 +154,11 @@ class JsonResource(HttpServer, resource.Resource):
                 logger.info("%s SynapseError: %s - %s", request, e.code, e.msg)
             else:
                 logger.exception(e)
+
+            code = e.code
             self._send_response(
                 request,
-                e.code,
+                code,
                 cs_exception(e),
                 response_code_message=e.response_code_message
             )
@@ -158,6 +169,14 @@ class JsonResource(HttpServer, resource.Resource):
                 500,
                 {"error": "Internal server error"}
             )
+        finally:
+            code = str(code) if code else "-"
+
+            end = self.clock.time_msec()
+            logger.info(
+                "Processed request: %dms %s %s %s",
+                end-start, code, request.method, request.path
+            )
 
     def _send_response(self, request, code, response_json_object,
                        response_code_message=None):
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 7293715293..07b5f0187c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -22,7 +22,6 @@ import synapse.util.async
 import baserules
 
 import logging
-import fnmatch
 import json
 import re
 
@@ -130,26 +129,35 @@ class Pusher(object):
 
         defer.returnValue(Pusher.DEFAULT_ACTIONS)
 
+    @staticmethod
+    def _glob_to_regexp(glob):
+        r = re.escape(glob)
+        r = re.sub(r'\\\*', r'.*?', r)
+        r = re.sub(r'\\\?', r'.', r)
+
+        # handle [abc], [a-z] and [!a-z] style ranges.
+        r = re.sub(r'\\\[(\\\!|)(.*)\\\]',
+                   lambda x: ('[%s%s]' % (x.group(1) and '^' or '',
+                                          re.sub(r'\\\-', '-', x.group(2)))), r)
+        return r
+    
     def _event_fulfills_condition(self, ev, condition, display_name, room_member_count):
         if condition['kind'] == 'event_match':
             if 'pattern' not in condition:
                 logger.warn("event_match condition with no pattern")
                 return False
-            pat = condition['pattern']
-
-            if pat.strip("*?[]") == pat:
-                # no special glob characters so we assume the user means
-                # 'contains this string' rather than 'is this string'
-                pat = "*%s*" % (pat,)
-
+            # XXX: optimisation: cache our pattern regexps
+            r = r'\b%s\b' % self._glob_to_regexp(condition['pattern'])
             val = _value_for_dotted_key(condition['key'], ev)
             if val is None:
                 return False
-            return fnmatch.fnmatch(val.upper(), pat.upper())
+            return re.match(r, val, flags=re.IGNORECASE) != None
+
         elif condition['kind'] == 'device':
             if 'profile_tag' not in condition:
                 return True
             return condition['profile_tag'] == self.profile_tag
+
         elif condition['kind'] == 'contains_display_name':
             # This is special because display names can be different
             # between rooms and so you can't really hard code it in a rule.
@@ -159,9 +167,9 @@ class Pusher(object):
                 return False
             if not display_name:
                 return False
-            return fnmatch.fnmatch(
-                ev['content']['body'].upper(), "*%s*" % (display_name.upper(),)
-            )
+            return re.match("\b%s\b" % re.escape(display_name),
+                            ev['content']['body'], flags=re.IGNORECASE) != None
+
         elif condition['kind'] == 'room_member_count':
             if 'is' not in condition:
                 return False
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 8d4b806da6..37878f1e0b 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -4,24 +4,24 @@ def list_with_base_rules(rawrules, user_name):
     ruleslist = []
 
     # shove the server default rules for each kind onto the end of each
-    current_prio_class = 1
+    current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
     for r in rawrules:
-        if r['priority_class'] > current_prio_class:
-            while current_prio_class < r['priority_class']:
+        if r['priority_class'] < current_prio_class:
+            while r['priority_class'] < current_prio_class:
                 ruleslist.extend(make_base_rules(
                         user_name,
                         PRIORITY_CLASS_INVERSE_MAP[current_prio_class])
                     )
-                current_prio_class += 1
+                current_prio_class -= 1
 
         ruleslist.append(r)
 
-    while current_prio_class <= PRIORITY_CLASS_INVERSE_MAP.keys()[-1]:
+    while current_prio_class > 0:
         ruleslist.extend(make_base_rules(
             user_name,
             PRIORITY_CLASS_INVERSE_MAP[current_prio_class])
         )
-        current_prio_class += 1
+        current_prio_class -= 1
 
     return ruleslist
 
diff --git a/synapse/rest/client/v1/__init__.py b/synapse/rest/client/v1/__init__.py
index d8d01cdd16..21876b3487 100644
--- a/synapse/rest/client/v1/__init__.py
+++ b/synapse/rest/client/v1/__init__.py
@@ -25,7 +25,7 @@ class ClientV1RestResource(JsonResource):
     """A resource for version 1 of the matrix client API."""
 
     def __init__(self, hs):
-        JsonResource.__init__(self)
+        JsonResource.__init__(self, hs)
         self.register_servlets(self, hs)
 
     @staticmethod
diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py
index 8f611de3a8..bca65f2a6a 100644
--- a/synapse/rest/client/v2_alpha/__init__.py
+++ b/synapse/rest/client/v2_alpha/__init__.py
@@ -25,7 +25,7 @@ class ClientV2AlphaRestResource(JsonResource):
     """A resource for version 2 alpha of the matrix client API."""
 
     def __init__(self, hs):
-        JsonResource.__init__(self)
+        JsonResource.__init__(self, hs)
         self.register_servlets(self, hs)
 
     @staticmethod
diff --git a/synapse/rest/media/v1/base_resource.py b/synapse/rest/media/v1/base_resource.py
index 688e7376ad..d44d5f1298 100644
--- a/synapse/rest/media/v1/base_resource.py
+++ b/synapse/rest/media/v1/base_resource.py
@@ -82,7 +82,7 @@ class BaseMediaResource(Resource):
             raise SynapseError(
                 404,
                 "Invalid media id token %r" % (request.postpath,),
-                Codes.UNKKOWN,
+                Codes.UNKNOWN,
             )
 
     @staticmethod
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 61ed90f39f..9ca4d884dd 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -34,6 +34,7 @@ class MediaRepositoryResource(Resource):
 
         => POST /_matrix/media/v1/upload HTTP/1.1
            Content-Type: <media-type>
+           Content-Length: <content-length>
 
            <media>
 
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index b939a30e19..5b42782331 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -38,6 +38,35 @@ class UploadResource(BaseMediaResource):
     def render_OPTIONS(self, request):
         respond_with_json(request, 200, {}, send_cors=True)
         return NOT_DONE_YET
+        
+    @defer.inlineCallbacks
+    def create_content(self, media_type, upload_name, content, content_length, auth_user):
+        media_id = random_string(24)
+
+        fname = self.filepaths.local_media_filepath(media_id)
+        self._makedirs(fname)
+
+        # This shouldn't block for very long because the content will have
+        # already been uploaded at this point.
+        with open(fname, "wb") as f:
+            f.write(content)
+
+        yield self.store.store_local_media(
+            media_id=media_id,
+            media_type=media_type,
+            time_now_ms=self.clock.time_msec(),
+            upload_name=upload_name,
+            media_length=content_length,
+            user_id=auth_user,
+        )
+        media_info = {
+            "media_type": media_type,
+            "media_length": content_length,
+        }
+
+        yield self._generate_local_thumbnails(media_id, media_info)
+        
+        defer.returnValue("mxc://%s/%s" % (self.server_name, media_id))
 
     @defer.inlineCallbacks
     def _async_render_POST(self, request):
@@ -70,32 +99,10 @@ class UploadResource(BaseMediaResource):
             #    disposition = headers.getRawHeaders("Content-Disposition")[0]
             # TODO(markjh): parse content-dispostion
 
-            media_id = random_string(24)
-
-            fname = self.filepaths.local_media_filepath(media_id)
-            self._makedirs(fname)
-
-            # This shouldn't block for very long because the content will have
-            # already been uploaded at this point.
-            with open(fname, "wb") as f:
-                f.write(request.content.read())
-
-            yield self.store.store_local_media(
-                media_id=media_id,
-                media_type=media_type,
-                time_now_ms=self.clock.time_msec(),
-                upload_name=None,
-                media_length=content_length,
-                user_id=auth_user,
+            content_uri = yield self.create_content(
+                media_type, None, request.content.read(),
+                content_length, auth_user
             )
-            media_info = {
-                "media_type": media_type,
-                "media_length": content_length,
-            }
-
-            yield self._generate_local_thumbnails(media_id, media_info)
-
-            content_uri = "mxc://%s/%s" % (self.server_name, media_id)
 
             respond_with_json(
                 request, 200, {"content_uri": content_uri}, send_cors=True
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b350fd61f1..310ee0104c 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -85,6 +85,28 @@ class SQLBaseStore(object):
         self._db_pool = hs.get_db_pool()
         self._clock = hs.get_clock()
 
+        self._previous_txn_total_time = 0
+        self._current_txn_total_time = 0
+        self._previous_loop_ts = 0
+
+    def start_profiling(self):
+        self._previous_loop_ts = self._clock.time_msec()
+
+        def loop():
+            curr = self._current_txn_total_time
+            prev = self._previous_txn_total_time
+            self._previous_txn_total_time = curr
+
+            time_now = self._clock.time_msec()
+            time_then = self._previous_loop_ts
+            self._previous_loop_ts = time_now
+
+            ratio = (curr - prev)/(time_now - time_then)
+
+            logger.info("Total database time: %.3f%%", ratio * 100)
+
+        self._clock.looping_call(loop, 10000)
+
     @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
         """Wraps the .runInteraction() method on the underlying db_pool."""
@@ -114,6 +136,9 @@ class SQLBaseStore(object):
                         "[TXN END] {%s} %f",
                         name, end - start
                     )
+
+                    self._current_txn_total_time += end - start
+
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
                 inner_func, *args, **kwargs
diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql
index 16c2258ca4..302d958dbf 100644
--- a/synapse/storage/schema/delta/v12.sql
+++ b/synapse/storage/schema/delta/v12.sql
@@ -52,3 +52,14 @@ CREATE TABLE IF NOT EXISTS push_rules (
 );
 
 CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+  user_id TEXT,
+  filter_id INTEGER,
+  filter_json TEXT,
+  FOREIGN KEY(user_id) REFERENCES users(id)
+);
+
+CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
+  user_id, filter_id
+);
diff --git a/synapse/storage/schema/delta/v13.sql b/synapse/storage/schema/delta/v13.sql
deleted file mode 100644
index beb39ca201..0000000000
--- a/synapse/storage/schema/delta/v13.sql
+++ /dev/null
@@ -1,24 +0,0 @@
-/* Copyright 2015 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-CREATE TABLE IF NOT EXISTS user_filters(
-  user_id TEXT,
-  filter_id INTEGER,
-  filter_json TEXT,
-  FOREIGN KEY(user_id) REFERENCES users(id)
-);
-
-CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
-  user_id, filter_id
-);
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 4e837a918e..fee76b0a9b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -15,7 +15,7 @@
 
 from synapse.util.logcontext import LoggingContext
 
-from twisted.internet import reactor
+from twisted.internet import reactor, task
 
 import time
 
@@ -35,6 +35,14 @@ class Clock(object):
         """Returns the current system time in miliseconds since epoch."""
         return self.time() * 1000
 
+    def looping_call(self, f, msec):
+        l = task.LoopingCall(f)
+        l.start(msec/1000.0, now=False)
+        return l
+
+    def stop_looping_call(self, loop):
+        loop.stop()
+
     def call_later(self, delay, callback):
         current_context = LoggingContext.current_context()