summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py25
-rwxr-xr-xsynapse/app/homeserver.py10
-rw-r--r--synapse/appservice/__init__.py60
-rw-r--r--synapse/appservice/api.py22
-rw-r--r--synapse/appservice/scheduler.py254
-rw-r--r--synapse/config/appservice.py31
-rw-r--r--synapse/config/homeserver.py3
-rw-r--r--synapse/config/registration.py20
-rw-r--r--synapse/config/server.py2
-rw-r--r--synapse/events/__init__.py6
-rw-r--r--synapse/handlers/__init__.py8
-rw-r--r--synapse/handlers/appservice.py69
-rw-r--r--synapse/handlers/federation.py40
-rw-r--r--synapse/handlers/presence.py8
-rw-r--r--synapse/handlers/typing.py16
-rw-r--r--synapse/metrics/__init__.py35
-rw-r--r--synapse/notifier.py85
-rw-r--r--synapse/python_dependencies.py6
-rw-r--r--synapse/rest/appservice/v1/base.py48
-rw-r--r--synapse/rest/appservice/v1/register.py99
-rw-r--r--synapse/rest/key/__init__.py (renamed from synapse/rest/appservice/__init__.py)0
-rw-r--r--synapse/rest/key/v1/__init__.py (renamed from synapse/rest/appservice/v1/__init__.py)15
-rw-r--r--synapse/rest/key/v1/server_key_resource.py (renamed from synapse/http/server_key_resource.py)0
-rw-r--r--synapse/server.py1
-rw-r--r--synapse/storage/__init__.py8
-rw-r--r--synapse/storage/_base.py70
-rw-r--r--synapse/storage/appservice.py457
-rw-r--r--synapse/storage/events.py4
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql30
-rw-r--r--synapse/util/expiringcache.py2
-rw-r--r--synapse/util/lrucache.py7
32 files changed, 970 insertions, 473 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 749a60329c..fd87c7e2d0 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.8.1-r2"
+__version__ = "0.8.1-r3"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 0bf35109cd..3d2b45d217 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -216,17 +216,20 @@ class Auth(object):
         else:
             ban_level = 50  # FIXME (erikj): What should we do here?
 
-        if Membership.INVITE == membership:
-            # TODO (erikj): We should probably handle this more intelligently
-            # PRIVATE join rules.
-
-            # Invites are valid iff caller is in the room and target isn't.
+        if Membership.JOIN != membership:
+            # JOIN is the only action you can perform if you're not in the room
             if not caller_in_room:  # caller isn't joined
                 raise AuthError(
                     403,
                     "%s not in room %s." % (event.user_id, event.room_id,)
                 )
-            elif target_banned:
+
+        if Membership.INVITE == membership:
+            # TODO (erikj): We should probably handle this more intelligently
+            # PRIVATE join rules.
+
+            # Invites are valid iff caller is in the room and target isn't.
+            if target_banned:
                 raise AuthError(
                     403, "%s is banned from the room" % (target_user_id,)
                 )
@@ -252,13 +255,7 @@ class Auth(object):
                 raise AuthError(403, "You are not allowed to join this room")
         elif Membership.LEAVE == membership:
             # TODO (erikj): Implement kicks.
-
-            if not caller_in_room:  # trying to leave a room you aren't joined
-                raise AuthError(
-                    403,
-                    "%s not in room %s." % (target_user_id, event.room_id,)
-                )
-            elif target_banned and user_level < ban_level:
+            if target_banned and user_level < ban_level:
                 raise AuthError(
                     403, "You cannot unban user &s." % (target_user_id,)
                 )
@@ -493,7 +490,7 @@ class Auth(object):
             send_level = send_level_event.content.get("events", {}).get(
                 event.type
             )
-            if not send_level:
+            if send_level is None:
                 if hasattr(event, "state_key"):
                     send_level = send_level_event.content.get(
                         "state_default", 50
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 500cae05fb..27e53a9e56 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -32,15 +32,13 @@ from twisted.web.resource import Resource
 from twisted.web.static import File
 from twisted.web.server import Site
 from synapse.http.server import JsonResource, RootRedirect
-from synapse.rest.appservice.v1 import AppServiceRestResource
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.http.server_key_resource import LocalKey
+from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.api.urls import (
     CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
-    SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX,
-    STATIC_PREFIX
+    SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, STATIC_PREFIX
 )
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
@@ -78,9 +76,6 @@ class SynapseHomeServer(HomeServer):
     def build_resource_for_federation(self):
         return JsonResource(self)
 
-    def build_resource_for_app_services(self):
-        return AppServiceRestResource(self)
-
     def build_resource_for_web_client(self):
         import syweb
         syweb_path = os.path.dirname(syweb.__file__)
@@ -141,7 +136,6 @@ class SynapseHomeServer(HomeServer):
             (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()),
             (SERVER_KEY_PREFIX, self.get_resource_for_server_key()),
             (MEDIA_PREFIX, self.get_resource_for_media_repository()),
-            (APP_SERVICE_PREFIX, self.get_resource_for_app_services()),
             (STATIC_PREFIX, self.get_resource_for_static_content()),
         ]
 
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index a268a6bcc4..63a18b802b 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -20,6 +20,50 @@ import re
 logger = logging.getLogger(__name__)
 
 
+class ApplicationServiceState(object):
+    DOWN = "down"
+    UP = "up"
+
+
+class AppServiceTransaction(object):
+    """Represents an application service transaction."""
+
+    def __init__(self, service, id, events):
+        self.service = service
+        self.id = id
+        self.events = events
+
+    def send(self, as_api):
+        """Sends this transaction using the provided AS API interface.
+
+        Args:
+            as_api(ApplicationServiceApi): The API to use to send.
+        Returns:
+            A Deferred which resolves to True if the transaction was sent.
+        """
+        return as_api.push_bulk(
+            service=self.service,
+            events=self.events,
+            txn_id=self.id
+        )
+
+    def complete(self, store):
+        """Completes this transaction as successful.
+
+        Marks this transaction ID on the application service and removes the
+        transaction contents from the database.
+
+        Args:
+            store: The database store to operate on.
+        Returns:
+            A Deferred which resolves to True if the transaction was completed.
+        """
+        return store.complete_appservice_txn(
+            service=self.service,
+            txn_id=self.id
+        )
+
+
 class ApplicationService(object):
     """Defines an application service. This definition is mostly what is
     provided to the /register AS API.
@@ -35,13 +79,13 @@ class ApplicationService(object):
     NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
 
     def __init__(self, token, url=None, namespaces=None, hs_token=None,
-                 sender=None, txn_id=None):
+                 sender=None, id=None):
         self.token = token
         self.url = url
         self.hs_token = hs_token
         self.sender = sender
         self.namespaces = self._check_namespaces(namespaces)
-        self.txn_id = txn_id
+        self.id = id
 
     def _check_namespaces(self, namespaces):
         # Sanity check that it is of the form:
@@ -51,7 +95,7 @@ class ApplicationService(object):
         #   rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...],
         # }
         if not namespaces:
-            return None
+            namespaces = {}
 
         for ns in ApplicationService.NS_LIST:
             if ns not in namespaces:
@@ -155,7 +199,10 @@ class ApplicationService(object):
             return self._matches_user(event, member_list)
 
     def is_interested_in_user(self, user_id):
-        return self._matches_regex(user_id, ApplicationService.NS_USERS)
+        return (
+            self._matches_regex(user_id, ApplicationService.NS_USERS)
+            or user_id == self.sender
+        )
 
     def is_interested_in_alias(self, alias):
         return self._matches_regex(alias, ApplicationService.NS_ALIASES)
@@ -164,7 +211,10 @@ class ApplicationService(object):
         return self._matches_regex(room_id, ApplicationService.NS_ROOMS)
 
     def is_exclusive_user(self, user_id):
-        return self._is_exclusive(ApplicationService.NS_USERS, user_id)
+        return (
+            self._is_exclusive(ApplicationService.NS_USERS, user_id)
+            or user_id == self.sender
+        )
 
     def is_exclusive_alias(self, alias):
         return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index c2179f8d55..2a9becccb3 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -72,14 +72,19 @@ class ApplicationServiceApi(SimpleHttpClient):
         defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def push_bulk(self, service, events):
+    def push_bulk(self, service, events, txn_id=None):
         events = self._serialize(events)
 
+        if txn_id is None:
+            logger.warning("push_bulk: Missing txn ID sending events to %s",
+                           service.url)
+            txn_id = str(0)
+        txn_id = str(txn_id)
+
         uri = service.url + ("/transactions/%s" %
-                             urllib.quote(str(0)))  # TODO txn_ids
-        response = None
+                             urllib.quote(txn_id))
         try:
-            response = yield self.put_json(
+            yield self.put_json(
                 uri=uri,
                 json_body={
                     "events": events
@@ -87,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient):
                 args={
                     "access_token": service.hs_token
                 })
-            if response:  # just an empty json object
-                # TODO: Mark txn as sent successfully
-                defer.returnValue(True)
+            defer.returnValue(True)
+            return
         except CodeMessageException as e:
             logger.warning("push_bulk to %s received %s", uri, e.code)
         except Exception as ex:
@@ -97,8 +101,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def push(self, service, event):
-        response = yield self.push_bulk(service, [event])
+    def push(self, service, event, txn_id=None):
+        response = yield self.push_bulk(service, [event], txn_id)
         defer.returnValue(response)
 
     def _serialize(self, events):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
new file mode 100644
index 0000000000..59b0b1f4ac
--- /dev/null
+++ b/synapse/appservice/scheduler.py
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module controls the reliability for application service transactions.
+
+The nominal flow through this module looks like:
+              __________
+1---ASa[e]-->|  Service |--> Queue ASa[f]
+2----ASb[e]->|  Queuer  |
+3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
+                                    V
+      -````````-            +------------+
+      |````````|<--StoreTxn-|Transaction |
+      |Database|            | Controller |---> SEND TO AS
+      `--------`            +------------+
+What happens on SEND TO AS depends on the state of the Application Service:
+ - If the AS is marked as DOWN, do nothing.
+ - If the AS is marked as UP, send the transaction.
+     * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
+                 contents from the db.
+     * FAILURE : Marked AS as DOWN and start Recoverer.
+
+Recoverer attempts to recover ASes who have died. The flow for this looks like:
+                ,--------------------- backoff++ --------------.
+               V                                               |
+  START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
+             backoff           DB and try to send it
+                                 ^                |___________
+Mark AS as                       |                            V
+UP & quit           +---------- YES                       SUCCESS
+    |               |                                         |
+    NO <--- Have more txns? <------ Mark txn success & nuke <-+
+                                      from db; incr AS pos.
+                                         Reset backoff.
+
+This is all tied together by the AppServiceScheduler which DIs the required
+components.
+"""
+
+from synapse.appservice import ApplicationServiceState
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class AppServiceScheduler(object):
+    """ Public facing API for this module. Does the required DI to tie the
+    components together. This also serves as the "event_pool", which in this
+    case is a simple array.
+    """
+
+    def __init__(self, clock, store, as_api):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+
+        def create_recoverer(service, callback):
+            return _Recoverer(clock, store, as_api, service, callback)
+
+        self.txn_ctrl = _TransactionController(
+            clock, store, as_api, create_recoverer
+        )
+        self.queuer = _ServiceQueuer(self.txn_ctrl)
+
+    @defer.inlineCallbacks
+    def start(self):
+        logger.info("Starting appservice scheduler")
+        # check for any DOWN ASes and start recoverers for them.
+        recoverers = yield _Recoverer.start(
+            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        )
+        self.txn_ctrl.add_recoverers(recoverers)
+
+    def submit_event_for_as(self, service, event):
+        self.queuer.enqueue(service, event)
+
+
+class _ServiceQueuer(object):
+    """Queues events for the same application service together, sending
+    transactions as soon as possible. Once a transaction is sent successfully,
+    this schedules any other events in the queue to run.
+    """
+
+    def __init__(self, txn_ctrl):
+        self.queued_events = {}  # dict of {service_id: [events]}
+        self.pending_requests = {}  # dict of {service_id: Deferred}
+        self.txn_ctrl = txn_ctrl
+
+    def enqueue(self, service, event):
+        # if this service isn't being sent something
+        if not self.pending_requests.get(service.id):
+            self._send_request(service, [event])
+        else:
+            # add to queue for this service
+            if service.id not in self.queued_events:
+                self.queued_events[service.id] = []
+            self.queued_events[service.id].append(event)
+
+    def _send_request(self, service, events):
+        # send request and add callbacks
+        d = self.txn_ctrl.send(service, events)
+        d.addBoth(self._on_request_finish)
+        d.addErrback(self._on_request_fail)
+        self.pending_requests[service.id] = d
+
+    def _on_request_finish(self, service):
+        self.pending_requests[service.id] = None
+        # if there are queued events, then send them.
+        if (service.id in self.queued_events
+                and len(self.queued_events[service.id]) > 0):
+            self._send_request(service, self.queued_events[service.id])
+            self.queued_events[service.id] = []
+
+    def _on_request_fail(self, err):
+        logger.error("AS request failed: %s", err)
+
+
+class _TransactionController(object):
+
+    def __init__(self, clock, store, as_api, recoverer_fn):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.recoverer_fn = recoverer_fn
+        # keep track of how many recoverers there are
+        self.recoverers = []
+
+    @defer.inlineCallbacks
+    def send(self, service, events):
+        try:
+            txn = yield self.store.create_appservice_txn(
+                service=service,
+                events=events
+            )
+            service_is_up = yield self._is_service_up(service)
+            if service_is_up:
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    txn.complete(self.store)
+                else:
+                    self._start_recoverer(service)
+        except Exception as e:
+            logger.exception(e)
+            self._start_recoverer(service)
+        # request has finished
+        defer.returnValue(service)
+
+    @defer.inlineCallbacks
+    def on_recovered(self, recoverer):
+        self.recoverers.remove(recoverer)
+        logger.info("Successfully recovered application service AS ID %s",
+                    recoverer.service.id)
+        logger.info("Remaining active recoverers: %s", len(self.recoverers))
+        yield self.store.set_appservice_state(
+            recoverer.service,
+            ApplicationServiceState.UP
+        )
+
+    def add_recoverers(self, recoverers):
+        for r in recoverers:
+            self.recoverers.append(r)
+        if len(recoverers) > 0:
+            logger.info("New active recoverers: %s", len(self.recoverers))
+
+    @defer.inlineCallbacks
+    def _start_recoverer(self, service):
+        yield self.store.set_appservice_state(
+            service,
+            ApplicationServiceState.DOWN
+        )
+        logger.info(
+            "Application service falling behind. Starting recoverer. AS ID %s",
+            service.id
+        )
+        recoverer = self.recoverer_fn(service, self.on_recovered)
+        self.add_recoverers([recoverer])
+        recoverer.recover()
+
+    @defer.inlineCallbacks
+    def _is_service_up(self, service):
+        state = yield self.store.get_appservice_state(service)
+        defer.returnValue(state == ApplicationServiceState.UP or state is None)
+
+
+class _Recoverer(object):
+
+    @staticmethod
+    @defer.inlineCallbacks
+    def start(clock, store, as_api, callback):
+        services = yield store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
+        )
+        recoverers = [
+            _Recoverer(clock, store, as_api, s, callback) for s in services
+        ]
+        for r in recoverers:
+            logger.info("Starting recoverer for AS ID %s which was marked as "
+                        "DOWN", r.service.id)
+            r.recover()
+        defer.returnValue(recoverers)
+
+    def __init__(self, clock, store, as_api, service, callback):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.service = service
+        self.callback = callback
+        self.backoff_counter = 1
+
+    def recover(self):
+        self.clock.call_later((2 ** self.backoff_counter), self.retry)
+
+    def _backoff(self):
+        # cap the backoff to be around 18h => (2^16) = 65536 secs
+        if self.backoff_counter < 16:
+            self.backoff_counter += 1
+        self.recover()
+
+    @defer.inlineCallbacks
+    def retry(self):
+        try:
+            txn = yield self.store.get_oldest_unsent_txn(self.service)
+            if txn:
+                logger.info("Retrying transaction %s for AS ID %s",
+                            txn.id, txn.service.id)
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    yield txn.complete(self.store)
+                    # reset the backoff counter and retry immediately
+                    self.backoff_counter = 1
+                    yield self.retry()
+                else:
+                    self._backoff()
+            else:
+                self._set_service_recovered()
+        except Exception as e:
+            logger.exception(e)
+            self._backoff()
+
+    def _set_service_recovered(self):
+        self.callback(self)
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
new file mode 100644
index 0000000000..399a716d80
--- /dev/null
+++ b/synapse/config/appservice.py
@@ -0,0 +1,31 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class AppServiceConfig(Config):
+
+    def __init__(self, args):
+        super(AppServiceConfig, self).__init__(args)
+        self.app_service_config_files = args.app_service_config_files
+
+    @classmethod
+    def add_arguments(cls, parser):
+        super(AppServiceConfig, cls).add_arguments(parser)
+        group = parser.add_argument_group("appservice")
+        group.add_argument(
+            "--app-service-config-files", type=str, nargs='+',
+            help="A list of application service config files to use."
+        )
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 241afdf872..3edfadb98b 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -24,12 +24,13 @@ from .email import EmailConfig
 from .voip import VoipConfig
 from .registration import RegistrationConfig
 from .metrics import MetricsConfig
+from .appservice import AppServiceConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
                        EmailConfig, VoipConfig, RegistrationConfig,
-                       MetricsConfig,):
+                       MetricsConfig, AppServiceConfig,):
     pass
 
 
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 4401e774d1..d5c8f4bf7b 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -25,11 +25,11 @@ class RegistrationConfig(Config):
     def __init__(self, args):
         super(RegistrationConfig, self).__init__(args)
 
-        # `args.disable_registration` may either be a bool or a string depending
-        # on if the option was given a value (e.g. --disable-registration=false
-        # would set `args.disable_registration` to "false" not False.)
-        self.disable_registration = bool(
-            distutils.util.strtobool(str(args.disable_registration))
+        # `args.enable_registration` may either be a bool or a string depending
+        # on if the option was given a value (e.g. --enable-registration=true
+        # would set `args.enable_registration` to "true" not True.)
+        self.disable_registration = not bool(
+            distutils.util.strtobool(str(args.enable_registration))
         )
         self.registration_shared_secret = args.registration_shared_secret
 
@@ -39,11 +39,11 @@ class RegistrationConfig(Config):
         reg_group = parser.add_argument_group("registration")
 
         reg_group.add_argument(
-            "--disable-registration",
+            "--enable-registration",
             const=True,
-            default=True,
+            default=False,
             nargs='?',
-            help="Disable registration of new users.",
+            help="Enable registration for new users.",
         )
         reg_group.add_argument(
             "--registration-shared-secret", type=str,
@@ -53,8 +53,8 @@ class RegistrationConfig(Config):
 
     @classmethod
     def generate_config(cls, args, config_dir_path):
-        if args.disable_registration is None:
-            args.disable_registration = True
+        if args.enable_registration is None:
+            args.enable_registration = False
 
         if args.registration_shared_secret is None:
             args.registration_shared_secret = random_string_with_symbols(50)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 58a828cc4c..d4c223f348 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -110,7 +110,7 @@ class ServerConfig(Config):
             with open(args.signing_key_path, "w") as signing_key_file:
                 syutil.crypto.signing_key.write_signing_keys(
                     signing_key_file,
-                    (syutil.crypto.signing_key.generate_singing_key("auto"),),
+                    (syutil.crypto.signing_key.generate_signing_key("auto"),),
                 )
         else:
             signing_keys = cls.read_file(args.signing_key_path, "signing_key")
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 64e08223b0..e4495ccf12 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -46,9 +46,10 @@ def _event_dict_property(key):
 
 class EventBase(object):
     def __init__(self, event_dict, signatures={}, unsigned={},
-                 internal_metadata_dict={}):
+                 internal_metadata_dict={}, rejected_reason=None):
         self.signatures = signatures
         self.unsigned = unsigned
+        self.rejected_reason = rejected_reason
 
         self._event_dict = event_dict
 
@@ -109,7 +110,7 @@ class EventBase(object):
 
 
 class FrozenEvent(EventBase):
-    def __init__(self, event_dict, internal_metadata_dict={}):
+    def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
         event_dict = dict(event_dict)
 
         # Signatures is a dict of dicts, and this is faster than doing a
@@ -128,6 +129,7 @@ class FrozenEvent(EventBase):
             signatures=signatures,
             unsigned=unsigned,
             internal_metadata_dict=internal_metadata_dict,
+            rejected_reason=rejected_reason,
         )
 
     @staticmethod
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index d1b0e032a3..685792dbdc 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.appservice.scheduler import AppServiceScheduler
 from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
@@ -56,8 +57,13 @@ class Handlers(object):
         self.directory_handler = DirectoryHandler(hs)
         self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
+        asapi = ApplicationServiceApi(hs)
         self.appservice_handler = ApplicationServicesHandler(
-            hs, ApplicationServiceApi(hs)
+            hs, asapi, AppServiceScheduler(
+                clock=hs.get_clock(),
+                store=hs.get_datastore(),
+                as_api=asapi
+            )
         )
         self.sync_handler = SyncHandler(hs)
         self.auth_handler = AuthHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 2c488a46f6..492a630fdc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -16,57 +16,36 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import Codes, StoreError, SynapseError
 from synapse.appservice import ApplicationService
 from synapse.types import UserID
-import synapse.util.stringutils as stringutils
 
 import logging
 
 logger = logging.getLogger(__name__)
 
 
+def log_failure(failure):
+    logger.error(
+        "Application Services Failure",
+        exc_info=(
+            failure.type,
+            failure.value,
+            failure.getTracebackObject()
+        )
+    )
+
+
 # NB: Purposefully not inheriting BaseHandler since that contains way too much
 # setup code which this handler does not need or use. This makes testing a lot
 # easier.
 class ApplicationServicesHandler(object):
 
-    def __init__(self, hs, appservice_api):
+    def __init__(self, hs, appservice_api, appservice_scheduler):
         self.store = hs.get_datastore()
         self.hs = hs
         self.appservice_api = appservice_api
-
-    @defer.inlineCallbacks
-    def register(self, app_service):
-        logger.info("Register -> %s", app_service)
-        # check the token is recognised
-        try:
-            stored_service = yield self.store.get_app_service_by_token(
-                app_service.token
-            )
-            if not stored_service:
-                raise StoreError(404, "Application service not found")
-        except StoreError:
-            raise SynapseError(
-                403, "Unrecognised application services token. "
-                "Consult the home server admin.",
-                errcode=Codes.FORBIDDEN
-            )
-
-        app_service.hs_token = self._generate_hs_token()
-
-        # create a sender for this application service which is used when
-        # creating rooms, etc..
-        account = yield self.hs.get_handlers().registration_handler.register()
-        app_service.sender = account[0]
-
-        yield self.store.update_app_service(app_service)
-        defer.returnValue(app_service)
-
-    @defer.inlineCallbacks
-    def unregister(self, token):
-        logger.info("Unregister as_token=%s", token)
-        yield self.store.unregister_app_service(token)
+        self.scheduler = appservice_scheduler
+        self.started_scheduler = False
 
     @defer.inlineCallbacks
     def notify_interested_services(self, event):
@@ -90,9 +69,13 @@ class ApplicationServicesHandler(object):
         if event.type == EventTypes.Member:
             yield self._check_user_exists(event.state_key)
 
-        # Fork off pushes to these services - XXX First cut, best effort
+        if not self.started_scheduler:
+            self.scheduler.start().addErrback(log_failure)
+            self.started_scheduler = True
+
+        # Fork off pushes to these services
         for service in services:
-            self.appservice_api.push(service, event)
+            self.scheduler.submit_event_for_as(service, event)
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):
@@ -197,7 +180,14 @@ class ApplicationServicesHandler(object):
             return
 
         user_info = yield self.store.get_user_by_id(user_id)
-        defer.returnValue(len(user_info) == 0)
+        if len(user_info) > 0:
+            defer.returnValue(False)
+            return
+
+        # user not found; could be the AS though, so check.
+        services = yield self.store.get_app_services()
+        service_list = [s for s in services if s.sender == user_id]
+        defer.returnValue(len(service_list) == 0)
 
     @defer.inlineCallbacks
     def _check_user_exists(self, user_id):
@@ -206,6 +196,3 @@ class ApplicationServicesHandler(object):
             exists = yield self.query_user_exists(user_id)
             defer.returnValue(exists)
         defer.returnValue(True)
-
-    def _generate_hs_token(self):
-        return stringutils.random_string(24)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 15ba417e06..8aceac28cf 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -201,10 +201,18 @@ class FederationHandler(BaseHandler):
                 target_user = UserID.from_string(target_user_id)
                 extra_users.append(target_user)
 
-            yield self.notifier.on_new_room_event(
+            d = self.notifier.on_new_room_event(
                 event, extra_users=extra_users
             )
 
+            def log_failure(f):
+                logger.warn(
+                    "Failed to notify about %s: %s",
+                    event.event_id, f.value
+                )
+
+            d.addErrback(log_failure)
+
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
@@ -427,10 +435,18 @@ class FederationHandler(BaseHandler):
                 auth_events=auth_events,
             )
 
-            yield self.notifier.on_new_room_event(
+            d = self.notifier.on_new_room_event(
                 new_event, extra_users=[joinee]
             )
 
+            def log_failure(f):
+                logger.warn(
+                    "Failed to notify about %s: %s",
+                    new_event.event_id, f.value
+                )
+
+            d.addErrback(log_failure)
+
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -500,10 +516,18 @@ class FederationHandler(BaseHandler):
             target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
-        yield self.notifier.on_new_room_event(
+        d = self.notifier.on_new_room_event(
             event, extra_users=extra_users
         )
 
+        def log_failure(f):
+            logger.warn(
+                "Failed to notify about %s: %s",
+                event.event_id, f.value
+            )
+
+        d.addErrback(log_failure)
+
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
@@ -574,10 +598,18 @@ class FederationHandler(BaseHandler):
         )
 
         target_user = UserID.from_string(event.state_key)
-        yield self.notifier.on_new_room_event(
+        d = self.notifier.on_new_room_event(
             event, extra_users=[target_user],
         )
 
+        def log_failure(f):
+            logger.warn(
+                "Failed to notify about %s: %s",
+                event.event_id, f.value
+            )
+
+        d.addErrback(log_failure)
+
         defer.returnValue(event)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 731df00648..bbc7a0f200 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,6 +33,10 @@ logger = logging.getLogger(__name__)
 metrics = synapse.metrics.get_metrics_for(__name__)
 
 
+# Don't bother bumping "last active" time if it differs by less than 60 seconds
+LAST_ACTIVE_GRANULARITY = 60*1000
+
+
 # TODO(paul): Maybe there's one of these I can steal from somewhere
 def partition(l, func):
     """Partition the list by the result of func applied to each element."""
@@ -282,6 +286,10 @@ class PresenceHandler(BaseHandler):
         if now is None:
             now = self.clock.time_msec()
 
+        prev_state = self._get_or_make_usercache(user)
+        if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY:
+            return
+
         self.changed_presencelike_data(user, {"last_active": now})
 
     def changed_presencelike_data(self, user, state):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c2762f92c7..c0b2bd7db0 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -223,6 +223,7 @@ class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
         self._handler = None
+        self._room_member_handler = None
 
     def handler(self):
         # Avoid cyclic dependency in handler setup
@@ -230,6 +231,11 @@ class TypingNotificationEventSource(object):
             self._handler = self.hs.get_handlers().typing_notification_handler
         return self._handler
 
+    def room_member_handler(self):
+        if not self._room_member_handler:
+            self._room_member_handler = self.hs.get_handlers().room_member_handler
+        return self._room_member_handler
+
     def _make_event_for(self, room_id):
         typing = self.handler()._room_typing[room_id]
         return {
@@ -240,19 +246,25 @@ class TypingNotificationEventSource(object):
             },
         }
 
+    @defer.inlineCallbacks
     def get_new_events_for_user(self, user, from_key, limit):
         from_key = int(from_key)
         handler = self.handler()
 
+        joined_room_ids = (
+            yield self.room_member_handler().get_joined_rooms_for_user(user)
+        )
+
         events = []
         for room_id in handler._room_serials:
+            if room_id not in joined_room_ids:
+                continue
             if handler._room_serials[room_id] <= from_key:
                 continue
 
-            # TODO: check if user is in room
             events.append(self._make_event_for(room_id))
 
-        return (events, handler._latest_room_serial)
+        defer.returnValue((events, handler._latest_room_serial))
 
     def get_current_key(self):
         return self.handler()._latest_room_serial
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index dffb8a4861..9233ea3da9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,6 +18,8 @@ from __future__ import absolute_import
 
 import logging
 from resource import getrusage, getpagesize, RUSAGE_SELF
+import os
+import stat
 
 from .metric import (
     CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@@ -109,3 +111,36 @@ resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000)
 
 # pages
 resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE)
+
+TYPES = {
+    stat.S_IFSOCK: "SOCK",
+    stat.S_IFLNK: "LNK",
+    stat.S_IFREG: "REG",
+    stat.S_IFBLK: "BLK",
+    stat.S_IFDIR: "DIR",
+    stat.S_IFCHR: "CHR",
+    stat.S_IFIFO: "FIFO",
+}
+
+
+def _process_fds():
+    counts = {(k,): 0 for k in TYPES.values()}
+    counts[("other",)] = 0
+
+    for fd in os.listdir("/proc/self/fd"):
+        try:
+            s = os.stat("/proc/self/fd/%s" % (fd))
+            fmt = stat.S_IFMT(s.st_mode)
+            if fmt in TYPES:
+                t = TYPES[fmt]
+            else:
+                t = "other"
+
+            counts[(t,)] += 1
+        except OSError:
+            # the dirh itself used by listdir() is usually missing by now
+            pass
+
+    return counts
+
+get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 7121d659d0..d750a6fcf7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -59,10 +59,11 @@ class _NotificationListener(object):
         self.limit = limit
         self.timeout = timeout
         self.deferred = deferred
-
         self.rooms = rooms
+        self.timer = None
 
-        self.pending_notifications = []
+    def notified(self):
+        return self.deferred.called
 
     def notify(self, notifier, events, start_token, end_token):
         """ Inform whoever is listening about the new events. This will
@@ -78,16 +79,27 @@ class _NotificationListener(object):
         except defer.AlreadyCalledError:
             pass
 
+        # Should the following be done be using intrusively linked lists?
+        # -- erikj
+
         for room in self.rooms:
             lst = notifier.room_to_listeners.get(room, set())
             lst.discard(self)
 
         notifier.user_to_listeners.get(self.user, set()).discard(self)
+
         if self.appservice:
             notifier.appservice_to_listeners.get(
                 self.appservice, set()
             ).discard(self)
 
+        # Cancel the timeout for this notifer if one exists.
+        if self.timer is not None:
+            try:
+                notifier.clock.cancel_call_later(self.timer)
+            except:
+                logger.exception("Failed to cancel notifier timer")
+
 
 class Notifier(object):
     """ This class is responsible for notifying any listeners when there are
@@ -161,10 +173,18 @@ class Notifier(object):
 
         room_source = self.event_sources.sources["room"]
 
-        listeners = self.room_to_listeners.get(room_id, set()).copy()
+        room_listeners = self.room_to_listeners.get(room_id, set())
+
+        _discard_if_notified(room_listeners)
+
+        listeners = room_listeners.copy()
 
         for user in extra_users:
-            listeners |= self.user_to_listeners.get(user, set()).copy()
+            user_listeners = self.user_to_listeners.get(user, set())
+
+            _discard_if_notified(user_listeners)
+
+            listeners |= user_listeners
 
         for appservice in self.appservice_to_listeners:
             # TODO (kegan): Redundant appservice listener checks?
@@ -173,9 +193,13 @@ class Notifier(object):
             # receive *invites* for users they are interested in. Does this
             # make the room_to_listeners check somewhat obselete?
             if appservice.is_interested(event):
-                listeners |= self.appservice_to_listeners.get(
+                app_listeners = self.appservice_to_listeners.get(
                     appservice, set()
-                ).copy()
+                )
+
+                _discard_if_notified(app_listeners)
+
+                listeners |= app_listeners
 
         logger.debug("on_new_room_event listeners %s", listeners)
 
@@ -226,10 +250,18 @@ class Notifier(object):
         listeners = set()
 
         for user in users:
-            listeners |= self.user_to_listeners.get(user, set()).copy()
+            user_listeners = self.user_to_listeners.get(user, set())
+
+            _discard_if_notified(user_listeners)
+
+            listeners |= user_listeners
 
         for room in rooms:
-            listeners |= self.room_to_listeners.get(room, set()).copy()
+            room_listeners = self.room_to_listeners.get(room, set())
+
+            _discard_if_notified(room_listeners)
+
+            listeners |= room_listeners
 
         @defer.inlineCallbacks
         def notify(listener):
@@ -300,14 +332,20 @@ class Notifier(object):
             self._register_with_keys(listener[0])
 
         result = yield callback()
+        timer = [None]
+
         if timeout:
             timed_out = [False]
 
             def _timeout_listener():
                 timed_out[0] = True
+                timer[0] = None
                 listener[0].notify(self, [], from_token, from_token)
 
-            self.clock.call_later(timeout/1000., _timeout_listener)
+            # We create multiple notification listeners so we have to manage
+            # canceling the timeout ourselves.
+            timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
+
             while not result and not timed_out[0]:
                 yield deferred
                 deferred = defer.Deferred()
@@ -322,6 +360,12 @@ class Notifier(object):
                 self._register_with_keys(listener[0])
                 result = yield callback()
 
+        if timer[0] is not None:
+            try:
+                self.clock.cancel_call_later(timer[0])
+            except:
+                logger.exception("Failed to cancel notifer timer")
+
         defer.returnValue(result)
 
     def get_events_for(self, user, rooms, pagination_config, timeout):
@@ -360,6 +404,8 @@ class Notifier(object):
         def _timeout_listener():
             # TODO (erikj): We should probably set to_token to the current
             # max rather than reusing from_token.
+            # Remove the timer from the listener so we don't try to cancel it.
+            listener.timer = None
             listener.notify(
                 self,
                 [],
@@ -375,8 +421,11 @@ class Notifier(object):
         if not timeout:
             _timeout_listener()
         else:
-            self.clock.call_later(timeout/1000.0, _timeout_listener)
-
+            # Only add the timer if the listener hasn't been notified
+            if not listener.notified():
+                listener.timer = self.clock.call_later(
+                    timeout/1000.0, _timeout_listener
+                )
         return
 
     @log_function
@@ -427,3 +476,17 @@ class Notifier(object):
 
         listeners = self.room_to_listeners.setdefault(room_id, set())
         listeners |= new_listeners
+
+        for l in new_listeners:
+            l.rooms.add(room_id)
+
+
+def _discard_if_notified(listener_set):
+    """Remove any 'stale' listeners from the given set.
+    """
+    to_discard = set()
+    for l in listener_set:
+        if l.notified():
+            to_discard.add(l)
+
+    listener_set -= to_discard
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 6b6d5508b8..dac927d0a7 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -4,7 +4,7 @@ from distutils.version import LooseVersion
 logger = logging.getLogger(__name__)
 
 REQUIREMENTS = {
-    "syutil>=0.0.3": ["syutil"],
+    "syutil>=0.0.4": ["syutil"],
     "Twisted==14.0.2": ["twisted==14.0.2"],
     "service_identity>=1.0.0": ["service_identity>=1.0.0"],
     "pyopenssl>=0.14": ["OpenSSL>=0.14"],
@@ -43,8 +43,8 @@ DEPENDENCY_LINKS = [
     ),
     github_link(
         project="matrix-org/syutil",
-        version="v0.0.3",
-        egg="syutil-0.0.3",
+        version="v0.0.4",
+        egg="syutil-0.0.4",
     ),
     github_link(
         project="matrix-org/matrix-angular-sdk",
diff --git a/synapse/rest/appservice/v1/base.py b/synapse/rest/appservice/v1/base.py
deleted file mode 100644
index 65d5bcf9be..0000000000
--- a/synapse/rest/appservice/v1/base.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This module contains base REST classes for constructing client v1 servlets.
-"""
-
-from synapse.http.servlet import RestServlet
-from synapse.api.urls import APP_SERVICE_PREFIX
-import re
-
-import logging
-
-
-logger = logging.getLogger(__name__)
-
-
-def as_path_pattern(path_regex):
-    """Creates a regex compiled appservice path with the correct path
-    prefix.
-
-    Args:
-        path_regex (str): The regex string to match. This should NOT have a ^
-        as this will be prefixed.
-    Returns:
-        SRE_Pattern
-    """
-    return re.compile("^" + APP_SERVICE_PREFIX + path_regex)
-
-
-class AppServiceRestServlet(RestServlet):
-    """A base Synapse REST Servlet for the application services version 1 API.
-    """
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.handler = hs.get_handlers().appservice_handler
diff --git a/synapse/rest/appservice/v1/register.py b/synapse/rest/appservice/v1/register.py
deleted file mode 100644
index ea24d88f79..0000000000
--- a/synapse/rest/appservice/v1/register.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
-#
-# Licensensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""This module contains REST servlets to do with registration: /register"""
-from twisted.internet import defer
-
-from base import AppServiceRestServlet, as_path_pattern
-from synapse.api.errors import CodeMessageException, SynapseError
-from synapse.storage.appservice import ApplicationService
-
-import json
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class RegisterRestServlet(AppServiceRestServlet):
-    """Handles AS registration with the home server.
-    """
-
-    PATTERN = as_path_pattern("/register$")
-
-    @defer.inlineCallbacks
-    def on_POST(self, request):
-        params = _parse_json(request)
-
-        # sanity check required params
-        try:
-            as_token = params["as_token"]
-            as_url = params["url"]
-            if (not isinstance(as_token, basestring) or
-                    not isinstance(as_url, basestring)):
-                raise ValueError
-        except (KeyError, ValueError):
-            raise SynapseError(
-                400, "Missed required keys: as_token(str) / url(str)."
-            )
-
-        try:
-            app_service = ApplicationService(
-                as_token, as_url, params["namespaces"]
-            )
-        except ValueError as e:
-            raise SynapseError(400, e.message)
-
-        app_service = yield self.handler.register(app_service)
-        hs_token = app_service.hs_token
-
-        defer.returnValue((200, {
-            "hs_token": hs_token
-        }))
-
-
-class UnregisterRestServlet(AppServiceRestServlet):
-    """Handles AS registration with the home server.
-    """
-
-    PATTERN = as_path_pattern("/unregister$")
-
-    def on_POST(self, request):
-        params = _parse_json(request)
-        try:
-            as_token = params["as_token"]
-            if not isinstance(as_token, basestring):
-                raise ValueError
-        except (KeyError, ValueError):
-            raise SynapseError(400, "Missing required key: as_token(str)")
-
-        yield self.handler.unregister(as_token)
-
-        raise CodeMessageException(500, "Not implemented")
-
-
-def _parse_json(request):
-    try:
-        content = json.loads(request.content.read())
-        if type(content) != dict:
-            raise SynapseError(400, "Content must be a JSON object.")
-        return content
-    except ValueError as e:
-        logger.warn(e)
-        raise SynapseError(400, "Content not JSON.")
-
-
-def register_servlets(hs, http_server):
-    RegisterRestServlet(hs).register(http_server)
-    UnregisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/appservice/__init__.py b/synapse/rest/key/__init__.py
index 1a84d94cd9..1a84d94cd9 100644
--- a/synapse/rest/appservice/__init__.py
+++ b/synapse/rest/key/__init__.py
diff --git a/synapse/rest/appservice/v1/__init__.py b/synapse/rest/key/v1/__init__.py
index a7877609ad..1a84d94cd9 100644
--- a/synapse/rest/appservice/v1/__init__.py
+++ b/synapse/rest/key/v1/__init__.py
@@ -12,18 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from . import register
-
-from synapse.http.server import JsonResource
-
-
-class AppServiceRestResource(JsonResource):
-    """A resource for version 1 of the matrix application service API."""
-
-    def __init__(self, hs):
-        JsonResource.__init__(self, hs)
-        self.register_servlets(self, hs)
-
-    @staticmethod
-    def register_servlets(appservice_resource, hs):
-        register.register_servlets(hs, appservice_resource)
diff --git a/synapse/http/server_key_resource.py b/synapse/rest/key/v1/server_key_resource.py
index 71e9a51f5c..71e9a51f5c 100644
--- a/synapse/http/server_key_resource.py
+++ b/synapse/rest/key/v1/server_key_resource.py
diff --git a/synapse/server.py b/synapse/server.py
index 4c4f6ca239..af87dab12c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -80,7 +80,6 @@ class BaseHomeServer(object):
         'resource_for_content_repo',
         'resource_for_server_key',
         'resource_for_media_repository',
-        'resource_for_app_services',
         'resource_for_metrics',
         'event_sources',
         'ratelimiter',
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index c69d11261c..f4dec70393 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,9 +14,10 @@
 # limitations under the License.
 
 from twisted.internet import defer
-
+from .appservice import (
+    ApplicationServiceStore, ApplicationServiceTransactionStore
+)
 from ._base import Cache
-from .appservice import ApplicationServiceStore
 from .directory import DirectoryStore
 from .events import EventsStore
 from .presence import PresenceStore
@@ -50,7 +51,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 14
+SCHEMA_VERSION = 15
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -71,6 +72,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 FilteringStore,
                 PusherStore,
                 PushRuleStore,
+                ApplicationServiceTransactionStore,
                 EventsStore,
                 ):
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 27ea65a0f6..e3e67d8e0d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
 
 sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
+perf_logger = logging.getLogger("synapse.storage.TIME")
 
 
 metrics = synapse.metrics.get_metrics_for("synapse.storage")
@@ -55,10 +56,14 @@ cache_counter = metrics.register_cache(
 
 class Cache(object):
 
-    def __init__(self, name, max_entries=1000, keylen=1):
-        self.cache = OrderedDict()
+    def __init__(self, name, max_entries=1000, keylen=1, lru=False):
+        if lru:
+            self.cache = LruCache(max_size=max_entries)
+            self.max_entries = None
+        else:
+            self.cache = OrderedDict()
+            self.max_entries = max_entries
 
-        self.max_entries = max_entries
         self.name = name
         self.keylen = keylen
 
@@ -82,8 +87,9 @@ class Cache(object):
         if len(keyargs) != self.keylen:
             raise ValueError("Expected a key to have %d items", self.keylen)
 
-        while len(self.cache) > self.max_entries:
-            self.cache.popitem(last=False)
+        if self.max_entries is not None:
+            while len(self.cache) >= self.max_entries:
+                self.cache.popitem(last=False)
 
         self.cache[keyargs] = value
 
@@ -94,9 +100,7 @@ class Cache(object):
         self.cache.pop(keyargs, None)
 
 
-# TODO(paul):
-#  * consider other eviction strategies - LRU?
-def cached(max_entries=1000, num_args=1):
+def cached(max_entries=1000, num_args=1, lru=False):
     """ A method decorator that applies a memoizing cache around the function.
 
     The function is presumed to take zero or more arguments, which are used in
@@ -115,6 +119,7 @@ def cached(max_entries=1000, num_args=1):
             name=orig.__name__,
             max_entries=max_entries,
             keylen=num_args,
+            lru=lru,
         )
 
         @functools.wraps(orig)
@@ -237,10 +242,8 @@ class SQLBaseStore(object):
         self._txn_perf_counters = PerformanceCounters()
         self._get_event_counters = PerformanceCounters()
 
-        self._get_event_cache = LruCache(hs.config.event_cache_size)
-
-        # Pretend the getEventCache is just another named cache
-        caches_by_name["*getEvent*"] = self._get_event_cache
+        self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
+                                      max_entries=hs.config.event_cache_size)
 
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
@@ -264,7 +267,7 @@ class SQLBaseStore(object):
                 time_now - time_then, limit=3
             )
 
-            logger.info(
+            perf_logger.info(
                 "Total database time: %.3f%% {%s} {%s}",
                 ratio * 100, top_three_counters, top_3_event_counters
             )
@@ -728,6 +731,12 @@ class SQLBaseStore(object):
 
         return [e for e in events if e]
 
+    def _invalidate_get_event_cache(self, event_id):
+        for check_redacted in (False, True):
+            for get_prev_content in (False, True):
+                self._get_event_cache.invalidate(event_id, check_redacted,
+                                                 get_prev_content)
+
     def _get_event_txn(self, txn, event_id, check_redacted=True,
                        get_prev_content=False, allow_rejected=False):
 
@@ -738,16 +747,14 @@ class SQLBaseStore(object):
             sql_getevents_timer.inc_by(curr_time - last_time, desc)
             return curr_time
 
-        cache = self._get_event_cache.setdefault(event_id, {})
-
         try:
-            # Separate cache entries for each way to invoke _get_event_txn
-            ret = cache[(check_redacted, get_prev_content, allow_rejected)]
+            ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content)
 
-            cache_counter.inc_hits("*getEvent*")
-            return ret
+            if allow_rejected or not ret.rejected_reason:
+                return ret
+            else:
+                return None
         except KeyError:
-            cache_counter.inc_misses("*getEvent*")
             pass
         finally:
             start_time = update_counter("event_cache", start_time)
@@ -772,19 +779,22 @@ class SQLBaseStore(object):
 
         start_time = update_counter("select_event", start_time)
 
+        result = self._get_event_from_row_txn(
+            txn, internal_metadata, js, redacted,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            rejected_reason=rejected_reason,
+        )
+        self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result)
+
         if allow_rejected or not rejected_reason:
-            result = self._get_event_from_row_txn(
-                txn, internal_metadata, js, redacted,
-                check_redacted=check_redacted,
-                get_prev_content=get_prev_content,
-            )
-            cache[(check_redacted, get_prev_content, allow_rejected)] = result
             return result
         else:
             return None
 
     def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
-                                check_redacted=True, get_prev_content=False):
+                                check_redacted=True, get_prev_content=False,
+                                rejected_reason=None):
 
         start_time = time.time() * 1000
 
@@ -799,7 +809,11 @@ class SQLBaseStore(object):
         internal_metadata = json.loads(internal_metadata)
         start_time = update_counter("decode_internal", start_time)
 
-        ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
+        ev = FrozenEvent(
+            d,
+            internal_metadata_dict=internal_metadata,
+            rejected_reason=rejected_reason,
+        )
         start_time = update_counter("build_frozen_event", start_time)
 
         if check_redacted and redacted:
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 850676ce6c..f8cbb3f323 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -13,154 +13,35 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson
+import urllib
+import yaml
 from simplejson import JSONDecodeError
+import simplejson as json
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.api.errors import StoreError
-from synapse.appservice import ApplicationService
+from synapse.appservice import ApplicationService, AppServiceTransaction
 from synapse.storage.roommember import RoomsForUser
+from synapse.types import UserID
 from ._base import SQLBaseStore
 
 
 logger = logging.getLogger(__name__)
 
 
-def log_failure(failure):
-    logger.error("Failed to detect application services: %s", failure.value)
-    logger.error(failure.getTraceback())
-
-
 class ApplicationServiceStore(SQLBaseStore):
 
     def __init__(self, hs):
         super(ApplicationServiceStore, self).__init__(hs)
+        self.hostname = hs.hostname
         self.services_cache = []
-        self.cache_defer = self._populate_cache()
-        self.cache_defer.addErrback(log_failure)
-
-    @defer.inlineCallbacks
-    def unregister_app_service(self, token):
-        """Unregisters this service.
-
-        This removes all AS specific regex and the base URL. The token is the
-        only thing preserved for future registration attempts.
-        """
-        yield self.cache_defer  # make sure the cache is ready
-        yield self.runInteraction(
-            "unregister_app_service",
-            self._unregister_app_service_txn,
-            token,
-        )
-        # update cache TODO: Should this be in the txn?
-        for service in self.services_cache:
-            if service.token == token:
-                service.url = None
-                service.namespaces = None
-                service.hs_token = None
-
-    def _unregister_app_service_txn(self, txn, token):
-        # kill the url to prevent pushes
-        txn.execute(
-            "UPDATE application_services SET url=NULL WHERE token=?",
-            (token,)
-        )
-
-        # cleanup regex
-        as_id = self._get_as_id_txn(txn, token)
-        if not as_id:
-            logger.warning(
-                "unregister_app_service_txn: Failed to find as_id for token=",
-                token
-            )
-            return False
-
-        txn.execute(
-            "DELETE FROM application_services_regex WHERE as_id=?",
-            (as_id,)
+        self._populate_appservice_cache(
+            hs.config.app_service_config_files
         )
-        return True
 
-    @defer.inlineCallbacks
-    def update_app_service(self, service):
-        """Update an application service, clobbering what was previously there.
-
-        Args:
-            service(ApplicationService): The updated service.
-        """
-        yield self.cache_defer  # make sure the cache is ready
-
-        # NB: There is no "insert" since we provide no public-facing API to
-        # allocate new ASes. It relies on the server admin inserting the AS
-        # token into the database manually.
-
-        if not service.token or not service.url:
-            raise StoreError(400, "Token and url must be specified.")
-
-        if not service.hs_token:
-            raise StoreError(500, "No HS token")
-
-        yield self.runInteraction(
-            "update_app_service",
-            self._update_app_service_txn,
-            service
-        )
-
-        # update cache TODO: Should this be in the txn?
-        for (index, cache_service) in enumerate(self.services_cache):
-            if service.token == cache_service.token:
-                self.services_cache[index] = service
-                logger.info("Updated: %s", service)
-                return
-        # new entry
-        self.services_cache.append(service)
-        logger.info("Updated(new): %s", service)
-
-    def _update_app_service_txn(self, txn, service):
-        as_id = self._get_as_id_txn(txn, service.token)
-        if not as_id:
-            logger.warning(
-                "update_app_service_txn: Failed to find as_id for token=",
-                service.token
-            )
-            return False
-
-        txn.execute(
-            "UPDATE application_services SET url=?, hs_token=?, sender=? "
-            "WHERE id=?",
-            (service.url, service.hs_token, service.sender, as_id,)
-        )
-        # cleanup regex
-        txn.execute(
-            "DELETE FROM application_services_regex WHERE as_id=?",
-            (as_id,)
-        )
-        for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST):
-            if ns_str in service.namespaces:
-                for regex_obj in service.namespaces[ns_str]:
-                    txn.execute(
-                        "INSERT INTO application_services_regex("
-                        "as_id, namespace, regex) values(?,?,?)",
-                        (as_id, ns_int, simplejson.dumps(regex_obj))
-                    )
-        return True
-
-    def _get_as_id_txn(self, txn, token):
-        cursor = txn.execute(
-            "SELECT id FROM application_services WHERE token=?",
-            (token,)
-        )
-        res = cursor.fetchone()
-        if res:
-            return res[0]
-
-    @defer.inlineCallbacks
     def get_app_services(self):
-        yield self.cache_defer  # make sure the cache is ready
-        defer.returnValue(self.services_cache)
+        return defer.succeed(self.services_cache)
 
-    @defer.inlineCallbacks
     def get_app_service_by_user_id(self, user_id):
         """Retrieve an application service from their user ID.
 
@@ -174,37 +55,23 @@ class ApplicationServiceStore(SQLBaseStore):
         Returns:
             synapse.appservice.ApplicationService or None.
         """
-
-        yield self.cache_defer  # make sure the cache is ready
-
         for service in self.services_cache:
             if service.sender == user_id:
-                defer.returnValue(service)
-                return
-        defer.returnValue(None)
+                return defer.succeed(service)
+        return defer.succeed(None)
 
-    @defer.inlineCallbacks
-    def get_app_service_by_token(self, token, from_cache=True):
+    def get_app_service_by_token(self, token):
         """Get the application service with the given appservice token.
 
         Args:
             token (str): The application service token.
-            from_cache (bool): True to get this service from the cache, False to
-                               check the database.
-        Raises:
-            StoreError if there was a problem retrieving this service.
+        Returns:
+            synapse.appservice.ApplicationService or None.
         """
-        yield self.cache_defer  # make sure the cache is ready
-
-        if from_cache:
-            for service in self.services_cache:
-                if service.token == token:
-                    defer.returnValue(service)
-                    return
-            defer.returnValue(None)
-
-        # TODO: The from_cache=False impl
-        # TODO: This should be JOINed with the application_services_regex table.
+        for service in self.services_cache:
+            if service.token == token:
+                return defer.succeed(service)
+        return defer.succeed(None)
 
     def get_app_service_rooms(self, service):
         """Get a list of RoomsForUser for this application service.
@@ -277,12 +144,7 @@ class ApplicationServiceStore(SQLBaseStore):
 
         return rooms_for_user_matching_user_id
 
-    @defer.inlineCallbacks
-    def _populate_cache(self):
-        """Populates the ApplicationServiceCache from the database."""
-        sql = ("SELECT * FROM application_services LEFT JOIN "
-               "application_services_regex ON application_services.id = "
-               "application_services_regex.as_id")
+    def _parse_services_dict(self, results):
         # SQL results in the form:
         # [
         #   {
@@ -296,12 +158,14 @@ class ApplicationServiceStore(SQLBaseStore):
         #   }
         # ]
         services = {}
-        results = yield self._execute_and_decode("_populate_cache", sql)
         for res in results:
             as_token = res["token"]
+            if as_token is None:
+                continue
             if as_token not in services:
                 # add the service
                 services[as_token] = {
+                    "id": res["id"],
                     "url": res["url"],
                     "token": as_token,
                     "hs_token": res["hs_token"],
@@ -319,20 +183,287 @@ class ApplicationServiceStore(SQLBaseStore):
             try:
                 services[as_token]["namespaces"][
                     ApplicationService.NS_LIST[ns_int]].append(
-                    simplejson.loads(res["regex"])
+                    json.loads(res["regex"])
                 )
             except IndexError:
                 logger.error("Bad namespace enum '%s'. %s", ns_int, res)
             except JSONDecodeError:
                 logger.error("Bad regex object '%s'", res["regex"])
 
-        # TODO get last successful txn id f.e. service
+        service_list = []
         for service in services.values():
-            logger.info("Found application service: %s", service)
-            self.services_cache.append(ApplicationService(
+            service_list.append(ApplicationService(
                 token=service["token"],
                 url=service["url"],
                 namespaces=service["namespaces"],
                 hs_token=service["hs_token"],
-                sender=service["sender"]
+                sender=service["sender"],
+                id=service["id"]
             ))
+        return service_list
+
+    def _load_appservice(self, as_info):
+        required_string_fields = [
+            "url", "as_token", "hs_token", "sender_localpart"
+        ]
+        for field in required_string_fields:
+            if not isinstance(as_info.get(field), basestring):
+                raise KeyError("Required string field: '%s'", field)
+
+        localpart = as_info["sender_localpart"]
+        if urllib.quote(localpart) != localpart:
+            raise ValueError(
+                "sender_localpart needs characters which are not URL encoded."
+            )
+        user = UserID(localpart, self.hostname)
+        user_id = user.to_string()
+
+        # namespace checks
+        if not isinstance(as_info.get("namespaces"), dict):
+            raise KeyError("Requires 'namespaces' object.")
+        for ns in ApplicationService.NS_LIST:
+            # specific namespaces are optional
+            if ns in as_info["namespaces"]:
+                # expect a list of dicts with exclusive and regex keys
+                for regex_obj in as_info["namespaces"][ns]:
+                    if not isinstance(regex_obj, dict):
+                        raise ValueError(
+                            "Expected namespace entry in %s to be an object,"
+                            " but got %s", ns, regex_obj
+                        )
+                    if not isinstance(regex_obj.get("regex"), basestring):
+                        raise ValueError(
+                            "Missing/bad type 'regex' key in %s", regex_obj
+                        )
+                    if not isinstance(regex_obj.get("exclusive"), bool):
+                        raise ValueError(
+                            "Missing/bad type 'exclusive' key in %s", regex_obj
+                        )
+        return ApplicationService(
+            token=as_info["as_token"],
+            url=as_info["url"],
+            namespaces=as_info["namespaces"],
+            hs_token=as_info["hs_token"],
+            sender=user_id,
+            id=as_info["as_token"]  # the token is the only unique thing here
+        )
+
+    def _populate_appservice_cache(self, config_files):
+        """Populates a cache of Application Services from the config files."""
+        if not isinstance(config_files, list):
+            logger.warning(
+                "Expected %s to be a list of AS config files.", config_files
+            )
+            return
+
+        for config_file in config_files:
+            try:
+                with open(config_file, 'r') as f:
+                    appservice = self._load_appservice(yaml.load(f))
+                    logger.info("Loaded application service: %s", appservice)
+                    self.services_cache.append(appservice)
+            except Exception as e:
+                logger.error("Failed to load appservice from '%s'", config_file)
+                logger.exception(e)
+
+
+class ApplicationServiceTransactionStore(SQLBaseStore):
+
+    def __init__(self, hs):
+        super(ApplicationServiceTransactionStore, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def get_appservices_by_state(self, state):
+        """Get a list of application services based on their state.
+
+        Args:
+            state(ApplicationServiceState): The state to filter on.
+        Returns:
+            A Deferred which resolves to a list of ApplicationServices, which
+            may be empty.
+        """
+        results = yield self._simple_select_list(
+            "application_services_state",
+            dict(state=state),
+            ["as_id"]
+        )
+        # NB: This assumes this class is linked with ApplicationServiceStore
+        as_list = yield self.get_app_services()
+        services = []
+
+        for res in results:
+            for service in as_list:
+                if service.id == res["as_id"]:
+                    services.append(service)
+        defer.returnValue(services)
+
+    @defer.inlineCallbacks
+    def get_appservice_state(self, service):
+        """Get the application service state.
+
+        Args:
+            service(ApplicationService): The service whose state to set.
+        Returns:
+            A Deferred which resolves to ApplicationServiceState.
+        """
+        result = yield self._simple_select_one(
+            "application_services_state",
+            dict(as_id=service.id),
+            ["state"],
+            allow_none=True
+        )
+        if result:
+            defer.returnValue(result.get("state"))
+            return
+        defer.returnValue(None)
+
+    def set_appservice_state(self, service, state):
+        """Set the application service state.
+
+        Args:
+            service(ApplicationService): The service whose state to set.
+            state(ApplicationServiceState): The connectivity state to apply.
+        Returns:
+            A Deferred which resolves when the state was set successfully.
+        """
+        return self._simple_upsert(
+            "application_services_state",
+            dict(as_id=service.id),
+            dict(state=state)
+        )
+
+    def create_appservice_txn(self, service, events):
+        """Atomically creates a new transaction for this application service
+        with the given list of events.
+
+        Args:
+            service(ApplicationService): The service who the transaction is for.
+            events(list<Event>): A list of events to put in the transaction.
+        Returns:
+            AppServiceTransaction: A new transaction.
+        """
+        return self.runInteraction(
+            "create_appservice_txn",
+            self._create_appservice_txn,
+            service, events
+        )
+
+    def _create_appservice_txn(self, txn, service, events):
+        # work out new txn id (highest txn id for this service += 1)
+        # The highest id may be the last one sent (in which case it is last_txn)
+        # or it may be the highest in the txns list (which are waiting to be/are
+        # being sent)
+        last_txn_id = self._get_last_txn(txn, service.id)
+
+        result = txn.execute(
+            "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
+            (service.id,)
+        )
+        highest_txn_id = result.fetchone()[0]
+        if highest_txn_id is None:
+            highest_txn_id = 0
+
+        new_txn_id = max(highest_txn_id, last_txn_id) + 1
+
+        # Insert new txn into txn table
+        event_ids = [e.event_id for e in events]
+        txn.execute(
+            "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
+            "VALUES(?,?,?)",
+            (service.id, new_txn_id, json.dumps(event_ids))
+        )
+        return AppServiceTransaction(
+            service=service, id=new_txn_id, events=events
+        )
+
+    def complete_appservice_txn(self, txn_id, service):
+        """Completes an application service transaction.
+
+        Args:
+            txn_id(str): The transaction ID being completed.
+            service(ApplicationService): The application service which was sent
+            this transaction.
+        Returns:
+            A Deferred which resolves if this transaction was stored
+            successfully.
+        """
+        return self.runInteraction(
+            "complete_appservice_txn",
+            self._complete_appservice_txn,
+            txn_id, service
+        )
+
+    def _complete_appservice_txn(self, txn, txn_id, service):
+        txn_id = int(txn_id)
+
+        # Debugging query: Make sure the txn being completed is EXACTLY +1 from
+        # what was there before. If it isn't, we've got problems (e.g. the AS
+        # has probably missed some events), so whine loudly but still continue,
+        # since it shouldn't fail completion of the transaction.
+        last_txn_id = self._get_last_txn(txn, service.id)
+        if (last_txn_id + 1) != txn_id:
+            logger.error(
+                "appservice: Completing a transaction which has an ID > 1 from "
+                "the last ID sent to this AS. We've either dropped events or "
+                "sent it to the AS out of order. FIX ME. last_txn=%s "
+                "completing_txn=%s service_id=%s", last_txn_id, txn_id,
+                service.id
+            )
+
+        # Set current txn_id for AS to 'txn_id'
+        self._simple_upsert_txn(
+            txn, "application_services_state", dict(as_id=service.id),
+            dict(last_txn=txn_id)
+        )
+
+        # Delete txn
+        self._simple_delete_txn(
+            txn, "application_services_txns",
+            dict(txn_id=txn_id, as_id=service.id)
+        )
+
+    def get_oldest_unsent_txn(self, service):
+        """Get the oldest transaction which has not been sent for this
+        service.
+
+        Args:
+            service(ApplicationService): The app service to get the oldest txn.
+        Returns:
+            A Deferred which resolves to an AppServiceTransaction or
+            None.
+        """
+        return self.runInteraction(
+            "get_oldest_unsent_appservice_txn",
+            self._get_oldest_unsent_txn,
+            service
+        )
+
+    def _get_oldest_unsent_txn(self, txn, service):
+        # Monotonically increasing txn ids, so just select the smallest
+        # one in the txns table (we delete them when they are sent)
+        result = txn.execute(
+            "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
+            (service.id,)
+        )
+        entry = self.cursor_to_dict(result)[0]
+        if not entry or entry["txn_id"] is None:
+            # the min(txn_id) part will force a row, so entry may not be None
+            return None
+
+        event_ids = json.loads(entry["event_ids"])
+        events = self._get_events_txn(txn, event_ids)
+
+        return AppServiceTransaction(
+            service=service, id=entry["txn_id"], events=events
+        )
+
+    def _get_last_txn(self, txn, service_id):
+        result = txn.execute(
+            "SELECT last_txn FROM application_services_state WHERE as_id=?",
+            (service_id,)
+        )
+        last_txn_id = result.fetchone()
+        if last_txn_id is None or last_txn_id[0] is None:  # no row exists
+            return 0
+        else:
+            return int(last_txn_id[0])  # select 'last_txn' col
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a86230d92c..2425f57f5f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -94,7 +94,7 @@ class EventsStore(SQLBaseStore):
                            current_state=None):
 
         # Remove the any existing cache entries for the event_id
-        self._get_event_cache.pop(event.event_id)
+        self._invalidate_get_event_cache(event.event_id)
 
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
@@ -356,7 +356,7 @@ class EventsStore(SQLBaseStore):
 
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
-        self._get_event_cache.pop(event.redacts)
+        self._invalidate_get_event_cache(event.redacts)
         txn.execute(
             "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
             (event.event_id, event.redacts)
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
new file mode 100644
index 0000000000..2b27e2a429
--- /dev/null
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -0,0 +1,30 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+    as_id TEXT PRIMARY KEY,
+    state TEXT,
+    last_txn TEXT
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+    as_id TEXT NOT NULL,
+    txn_id INTEGER NOT NULL,
+    event_ids TEXT NOT NULL,
+    UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+);
+
+
+
diff --git a/synapse/util/expiringcache.py b/synapse/util/expiringcache.py
index 1c7859297a..06d1eea01b 100644
--- a/synapse/util/expiringcache.py
+++ b/synapse/util/expiringcache.py
@@ -65,7 +65,7 @@ class ExpiringCache(object):
         if self._max_len and len(self._cache.keys()) > self._max_len:
             sorted_entries = sorted(
                 self._cache.items(),
-                key=lambda k, v: v.time,
+                key=lambda (k, v): v.time,
             )
 
             for k, _ in sorted_entries[self._max_len:]:
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index 65d5792907..2f7b615f78 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -90,12 +90,16 @@ class LruCache(object):
         def cache_len():
             return len(cache)
 
+        def cache_contains(key):
+            return key in cache
+
         self.sentinel = object()
         self.get = cache_get
         self.set = cache_set
         self.setdefault = cache_set_default
         self.pop = cache_pop
         self.len = cache_len
+        self.contains = cache_contains
 
     def __getitem__(self, key):
         result = self.get(key, self.sentinel)
@@ -114,3 +118,6 @@ class LruCache(object):
 
     def __len__(self):
         return self.len()
+
+    def __contains__(self, key):
+        return self.contains(key)