summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/_base.py3
-rw-r--r--synapse/handlers/federation.py11
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/room.py17
-rw-r--r--synapse/handlers/sync.py98
-rw-r--r--synapse/state.py4
-rw-r--r--synapse/storage/__init__.py20
-rw-r--r--synapse/storage/account_data.py4
-rw-r--r--synapse/storage/events.py21
-rw-r--r--synapse/storage/presence.py6
-rw-r--r--synapse/storage/push_rule.py2
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/receipts.py6
-rw-r--r--synapse/storage/state.py2
-rw-r--r--synapse/storage/stream.py2
-rw-r--r--synapse/storage/tags.py6
-rw-r--r--synapse/storage/util/id_generators.py63
-rw-r--r--synapse/util/async.py32
18 files changed, 164 insertions, 145 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 5601ecea6e..5eeb7042c6 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -266,8 +266,7 @@ class BaseHandler(object):
 
                 context = yield state_handler.compute_event_context(
                     builder,
-                    old_state=(prev_member_event,),
-                    outlier=True
+                    old_state=(prev_member_event,)
                 )
 
         if builder.is_state():
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 092802b973..adafd06b24 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1118,11 +1118,9 @@ class FederationHandler(BaseHandler):
         """
         events_to_context = {}
         for e in itertools.chain(auth_events, state):
-            ctx = yield self.state_handler.compute_event_context(
-                e, outlier=True,
-            )
-            events_to_context[e.event_id] = ctx
             e.internal_metadata.outlier = True
+            ctx = yield self.state_handler.compute_event_context(e)
+            events_to_context[e.event_id] = ctx
 
         event_map = {
             e.event_id: e
@@ -1169,7 +1167,7 @@ class FederationHandler(BaseHandler):
         )
 
         new_event_context = yield self.state_handler.compute_event_context(
-            event, old_state=state, outlier=False,
+            event, old_state=state
         )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
@@ -1181,10 +1179,9 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
-        outlier = event.internal_metadata.is_outlier()
 
         context = yield self.state_handler.compute_event_context(
-            event, old_state=state, outlier=outlier,
+            event, old_state=state,
         )
 
         if not auth_events:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..0bb111d047 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,6 +21,7 @@ from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.types import UserID, RoomStreamToken, StreamToken
 
@@ -556,14 +557,7 @@ class MessageHandler(BaseHandler):
             except:
                 logger.exception("Failed to get snapshot")
 
-        # Only do N rooms at once
-        n = 5
-        d_list = [handle_room(e) for e in room_list]
-        for i in range(0, len(d_list), n):
-            yield defer.gatherResults(
-                d_list[i:i + n],
-                consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        yield concurrently_execute(handle_room, room_list, 10)
 
         account_data_events = []
         for account_data_type, content in account_data.items():
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ee99ded214..3e1d9282d7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,8 @@ from synapse.api.constants import (
     EventTypes, JoinRules, RoomCreationPreset,
 )
 from synapse.api.errors import AuthError, StoreError, SynapseError
-from synapse.util import stringutils, unwrapFirstError
+from synapse.util import stringutils
+from synapse.util.async import concurrently_execute
 from synapse.util.logcontext import preserve_context_over_fn
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -368,6 +369,8 @@ class RoomListHandler(BaseHandler):
     def _get_public_room_list(self):
         room_ids = yield self.store.get_public_room_ids()
 
+        results = []
+
         @defer.inlineCallbacks
         def handle_room(room_id):
             aliases = yield self.store.get_aliases_for_room(room_id)
@@ -428,18 +431,12 @@ class RoomListHandler(BaseHandler):
             joined_users = yield self.store.get_users_in_room(room_id)
             result["num_joined_members"] = len(joined_users)
 
-            defer.returnValue(result)
+            results.append(result)
 
-        result = []
-        for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
-            chunk_result = yield defer.gatherResults([
-                handle_room(room_id)
-                for room_id in chunk
-            ], consumeErrors=True).addErrback(unwrapFirstError)
-            result.extend(v for v in chunk_result if v)
+        yield concurrently_execute(handle_room, room_ids, 10)
 
         # FIXME (erikj): START is no longer a valid value
-        defer.returnValue({"start": "START", "end": "END", "chunk": result})
+        defer.returnValue({"start": "START", "end": "END", "chunk": results})
 
 
 class RoomContextHandler(BaseHandler):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 20a0626574..231140b655 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -17,8 +17,8 @@ from ._base import BaseHandler
 
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.async import concurrently_execute
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
@@ -250,58 +250,50 @@ class SyncHandler(BaseHandler):
         joined = []
         invited = []
         archived = []
-        deferreds = []
-
-        room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
-        for room_list_chunk in room_list_chunks:
-            for event in room_list_chunk:
-                if event.membership == Membership.JOIN:
-                    room_sync_deferred = preserve_fn(
-                        self.full_state_sync_for_joined_room
-                    )(
-                        room_id=event.room_id,
-                        sync_config=sync_config,
-                        now_token=now_token,
-                        timeline_since_token=timeline_since_token,
-                        ephemeral_by_room=ephemeral_by_room,
-                        tags_by_room=tags_by_room,
-                        account_data_by_room=account_data_by_room,
-                    )
-                    room_sync_deferred.addCallback(joined.append)
-                    deferreds.append(room_sync_deferred)
-                elif event.membership == Membership.INVITE:
-                    invite = yield self.store.get_event(event.event_id)
-                    invited.append(InvitedSyncResult(
-                        room_id=event.room_id,
-                        invite=invite,
-                    ))
-                elif event.membership in (Membership.LEAVE, Membership.BAN):
-                    # Always send down rooms we were banned or kicked from.
-                    if not sync_config.filter_collection.include_leave:
-                        if event.membership == Membership.LEAVE:
-                            if sync_config.user.to_string() == event.sender:
-                                continue
-
-                    leave_token = now_token.copy_and_replace(
-                        "room_key", "s%d" % (event.stream_ordering,)
-                    )
-                    room_sync_deferred = preserve_fn(
-                        self.full_state_sync_for_archived_room
-                    )(
-                        sync_config=sync_config,
-                        room_id=event.room_id,
-                        leave_event_id=event.event_id,
-                        leave_token=leave_token,
-                        timeline_since_token=timeline_since_token,
-                        tags_by_room=tags_by_room,
-                        account_data_by_room=account_data_by_room,
-                    )
-                    room_sync_deferred.addCallback(archived.append)
-                    deferreds.append(room_sync_deferred)
 
-            yield defer.gatherResults(
-                deferreds, consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        user_id = sync_config.user.to_string()
+
+        @defer.inlineCallbacks
+        def _generate_room_entry(event):
+            if event.membership == Membership.JOIN:
+                room_result = yield self.full_state_sync_for_joined_room(
+                    room_id=event.room_id,
+                    sync_config=sync_config,
+                    now_token=now_token,
+                    timeline_since_token=timeline_since_token,
+                    ephemeral_by_room=ephemeral_by_room,
+                    tags_by_room=tags_by_room,
+                    account_data_by_room=account_data_by_room,
+                )
+                joined.append(room_result)
+            elif event.membership == Membership.INVITE:
+                invite = yield self.store.get_event(event.event_id)
+                invited.append(InvitedSyncResult(
+                    room_id=event.room_id,
+                    invite=invite,
+                ))
+            elif event.membership in (Membership.LEAVE, Membership.BAN):
+                # Always send down rooms we were banned or kicked from.
+                if not sync_config.filter_collection.include_leave:
+                    if event.membership == Membership.LEAVE:
+                        if user_id == event.sender:
+                            return
+
+                leave_token = now_token.copy_and_replace(
+                    "room_key", "s%d" % (event.stream_ordering,)
+                )
+                room_result = yield self.full_state_sync_for_archived_room(
+                    sync_config=sync_config,
+                    room_id=event.room_id,
+                    leave_event_id=event.event_id,
+                    leave_token=leave_token,
+                    timeline_since_token=timeline_since_token,
+                    tags_by_room=tags_by_room,
+                    account_data_by_room=account_data_by_room,
+                )
+                archived.append(room_result)
+
+        yield concurrently_execute(_generate_room_entry, room_list, 10)
 
         account_data_for_user = sync_config.filter_collection.filter_account_data(
             self.account_data_for_user(account_data)
diff --git a/synapse/state.py b/synapse/state.py
index 4a9e148de7..1bca0f8f78 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -101,7 +101,7 @@ class StateHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
-    def compute_event_context(self, event, old_state=None, outlier=False):
+    def compute_event_context(self, event, old_state=None):
         """ Fills out the context with the `current state` of the graph. The
         `current state` here is defined to be the state of the event graph
         just before the event - i.e. it never includes `event`
@@ -116,7 +116,7 @@ class StateHandler(object):
         """
         context = EventContext()
 
-        if outlier:
+        if event.internal_metadata.is_outlier():
             # If this is an outlier, then we know it shouldn't have any current
             # state. Certainly store.get_current_state won't return any, and
             # persisting the event won't store the state group.
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index aaad38039e..57863bba4d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -88,15 +88,6 @@ class DataStore(RoomMemberStore, RoomStore,
         self.hs = hs
         self.database_engine = hs.database_engine
 
-        cur = db_conn.cursor()
-        try:
-            cur.execute("SELECT MIN(stream_ordering) FROM events",)
-            rows = cur.fetchall()
-            self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
-            self.min_stream_token = min(self.min_stream_token, -1)
-        finally:
-            cur.close()
-
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
@@ -105,6 +96,9 @@ class DataStore(RoomMemberStore, RoomStore,
         self._stream_id_gen = StreamIdGenerator(
             db_conn, "events", "stream_ordering"
         )
+        self._backfill_id_gen = StreamIdGenerator(
+            db_conn, "events", "stream_ordering", step=-1
+        )
         self._receipts_id_gen = StreamIdGenerator(
             db_conn, "receipts_linearized", "stream_id"
         )
@@ -129,7 +123,7 @@ class DataStore(RoomMemberStore, RoomStore,
             extra_tables=[("deleted_pushers", "stream_id")],
         )
 
-        events_max = self._stream_id_gen.get_max_token()
+        events_max = self._stream_id_gen.get_current_token()
         event_cache_prefill, min_event_val = self._get_cache_dict(
             db_conn, "events",
             entity_column="room_id",
@@ -145,7 +139,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "MembershipStreamChangeCache", events_max,
         )
 
-        account_max = self._account_data_id_gen.get_max_token()
+        account_max = self._account_data_id_gen.get_current_token()
         self._account_data_stream_cache = StreamChangeCache(
             "AccountDataAndTagsChangeCache", account_max,
         )
@@ -156,7 +150,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "presence_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._presence_id_gen.get_max_token(),
+            max_value=self._presence_id_gen.get_current_token(),
         )
         self.presence_stream_cache = StreamChangeCache(
             "PresenceStreamChangeCache", min_presence_val,
@@ -167,7 +161,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "push_rules_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._push_rules_stream_id_gen.get_max_token()[0],
+            max_value=self._push_rules_stream_id_gen.get_current_token()[0],
         )
 
         self.push_rules_stream_cache = StreamChangeCache(
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index faddefe219..7a7fbf1e52 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -200,7 +200,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_room_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -239,7 +239,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_user_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_max_stream_id(self, txn, next_id):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 83279d65fa..c4dc3b3d51 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -24,7 +24,6 @@ from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
 from canonicaljson import encode_canonical_json
-from contextlib import contextmanager
 from collections import namedtuple
 
 import logging
@@ -66,14 +65,9 @@ class EventsStore(SQLBaseStore):
             return
 
         if backfilled:
-            start = self.min_stream_token - 1
-            self.min_stream_token -= len(events_and_contexts) + 1
-            stream_orderings = range(start, self.min_stream_token, -1)
-
-            @contextmanager
-            def stream_ordering_manager():
-                yield stream_orderings
-            stream_ordering_manager = stream_ordering_manager()
+            stream_ordering_manager = self._backfill_id_gen.get_next_mult(
+                len(events_and_contexts)
+            )
         else:
             stream_ordering_manager = self._stream_id_gen.get_next_mult(
                 len(events_and_contexts)
@@ -130,7 +124,7 @@ class EventsStore(SQLBaseStore):
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_max_token()
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((stream_ordering, max_persisted_id))
 
     @defer.inlineCallbacks
@@ -198,7 +192,7 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases, event.room_id)
+            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
 
             # Add an entry to the current_state_resets table to record the point
             # where we clobbered the current state
@@ -1117,10 +1111,7 @@ class EventsStore(SQLBaseStore):
 
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
-
-        # TODO: Fix race with the persit_event txn by using one of the
-        # stream id managers
-        return -self.min_stream_token
+        return -self._backfill_id_gen.get_current_token()
 
     def get_all_new_events(self, last_backfill_id, last_forward_id,
                            current_backfill_id, current_forward_id, limit):
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4cec31e316..59b4ef5ce6 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -68,7 +68,9 @@ class PresenceStore(SQLBaseStore):
                 self._update_presence_txn, stream_orderings, presence_states,
             )
 
-        defer.returnValue((stream_orderings[-1], self._presence_id_gen.get_max_token()))
+        defer.returnValue((
+            stream_orderings[-1], self._presence_id_gen.get_current_token()
+        ))
 
     def _update_presence_txn(self, txn, stream_orderings, presence_states):
         for stream_id, state in zip(stream_orderings, presence_states):
@@ -155,7 +157,7 @@ class PresenceStore(SQLBaseStore):
         defer.returnValue([UserPresenceState(**row) for row in rows])
 
     def get_current_presence_token(self):
-        return self._presence_id_gen.get_max_token()
+        return self._presence_id_gen.get_current_token()
 
     def allow_presence_visible(self, observed_localpart, observer_userid):
         return self._simple_insert(
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9dbad2fd5f..d2bf7f2aec 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -392,7 +392,7 @@ class PushRuleStore(SQLBaseStore):
         """Get the position of the push rules stream.
         Returns a pair of a stream id for the push_rules stream and the
         room stream ordering it corresponds to."""
-        return self._push_rules_stream_id_gen.get_max_token()
+        return self._push_rules_stream_id_gen.get_current_token()
 
     def have_push_rules_changed_for_user(self, user_id, last_id):
         if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 87b2ac5773..d1669c778a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -78,7 +78,7 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(rows)
 
     def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_max_token()
+        return self._pushers_id_gen.get_current_token()
 
     def get_all_updated_pushers(self, last_id, current_id, limit):
         def get_all_updated_pushers_txn(txn):
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b9d848eaa..4befebc8e2 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore):
         super(ReceiptsStore, self).__init__(hs)
 
         self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
         )
 
     @cached(num_args=2)
@@ -221,7 +221,7 @@ class ReceiptsStore(SQLBaseStore):
         defer.returnValue(results)
 
     def get_max_receipt_stream_id(self):
-        return self._receipts_id_gen.get_max_token()
+        return self._receipts_id_gen.get_current_token()
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
@@ -346,7 +346,7 @@ class ReceiptsStore(SQLBaseStore):
             room_id, receipt_type, user_id, event_ids, data
         )
 
-        max_persisted_id = self._stream_id_gen.get_max_token()
+        max_persisted_id = self._stream_id_gen.get_current_token()
 
         defer.returnValue((stream_id, max_persisted_id))
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index f84fd0e30a..e9f9406014 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -461,4 +461,4 @@ class StateStore(SQLBaseStore):
         )
 
     def get_state_stream_token(self):
-        return self._state_groups_id_gen.get_max_token()
+        return self._state_groups_id_gen.get_current_token()
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index cf84938be5..76bcd9cd00 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -539,7 +539,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
-        token = yield self._stream_id_gen.get_max_token()
+        token = yield self._stream_id_gen.get_current_token()
         if direction != 'b':
             defer.returnValue("s%d" % (token,))
         else:
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index a0e6b42b30..9da23f34cb 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -30,7 +30,7 @@ class TagsStore(SQLBaseStore):
         Returns:
             A deferred int.
         """
-        return self._account_data_id_gen.get_max_token()
+        return self._account_data_id_gen.get_current_token()
 
     @cached()
     def get_tags_for_user(self, user_id):
@@ -200,7 +200,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -222,7 +222,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_revision_txn(self, txn, user_id, room_id, next_id):
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index a02dfc7d58..f69f1cdad4 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -21,7 +21,7 @@ import threading
 class IdGenerator(object):
     def __init__(self, db_conn, table, column):
         self._lock = threading.Lock()
-        self._next_id = _load_max_id(db_conn, table, column)
+        self._next_id = _load_current_id(db_conn, table, column)
 
     def get_next(self):
         with self._lock:
@@ -29,12 +29,16 @@ class IdGenerator(object):
             return self._next_id
 
 
-def _load_max_id(db_conn, table, column):
+def _load_current_id(db_conn, table, column, step=1):
     cur = db_conn.cursor()
-    cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    if step == 1:
+        cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    else:
+        cur.execute("SELECT MIN(%s) FROM %s" % (column, table,))
     val, = cur.fetchone()
     cur.close()
-    return int(val) if val else 1
+    current_id = int(val) if val else step
+    return (max if step > 0 else min)(current_id, step)
 
 
 class StreamIdGenerator(object):
@@ -45,17 +49,32 @@ class StreamIdGenerator(object):
     all ids less than or equal to it have completed. This handles the fact that
     persistence of events can complete out of order.
 
+    Args:
+        db_conn(connection):  A database connection to use to fetch the
+            initial value of the generator from.
+        table(str): A database table to read the initial value of the id
+            generator from.
+        column(str): The column of the database table to read the initial
+            value from the id generator from.
+        extra_tables(list): List of pairs of database tables and columns to
+            use to source the initial value of the generator from. The value
+            with the largest magnitude is used.
+        step(int): which direction the stream ids grow in. +1 to grow
+            upwards, -1 to grow downwards.
+
     Usage:
         with stream_id_gen.get_next() as stream_id:
             # ... persist event ...
     """
-    def __init__(self, db_conn, table, column, extra_tables=[]):
+    def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+        assert step != 0
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._step = step
+        self._current = _load_current_id(db_conn, table, column, step)
         for table, column in extra_tables:
-            self._current_max = max(
-                self._current_max,
-                _load_max_id(db_conn, table, column)
+            self._current = (max if step > 0 else min)(
+                self._current,
+                _load_current_id(db_conn, table, column, step)
             )
         self._unfinished_ids = deque()
 
@@ -66,8 +85,8 @@ class StreamIdGenerator(object):
                 # ... persist event ...
         """
         with self._lock:
-            self._current_max += 1
-            next_id = self._current_max
+            self._current += self._step
+            next_id = self._current
 
             self._unfinished_ids.append(next_id)
 
@@ -88,8 +107,12 @@ class StreamIdGenerator(object):
                 # ... persist events ...
         """
         with self._lock:
-            next_ids = range(self._current_max + 1, self._current_max + n + 1)
-            self._current_max += n
+            next_ids = range(
+                self._current + self._step,
+                self._current + self._step * (n + 1),
+                self._step
+            )
+            self._current += n
 
             for next_id in next_ids:
                 self._unfinished_ids.append(next_id)
@@ -105,15 +128,15 @@ class StreamIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
         with self._lock:
             if self._unfinished_ids:
-                return self._unfinished_ids[0] - 1
+                return self._unfinished_ids[0] - self._step
 
-            return self._current_max
+            return self._current
 
 
 class ChainedIdGenerator(object):
@@ -125,7 +148,7 @@ class ChainedIdGenerator(object):
     def __init__(self, chained_generator, db_conn, table, column):
         self.chained_generator = chained_generator
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._current_max = _load_current_id(db_conn, table, column)
         self._unfinished_ids = deque()
 
     def get_next(self):
@@ -137,7 +160,7 @@ class ChainedIdGenerator(object):
         with self._lock:
             self._current_max += 1
             next_id = self._current_max
-            chained_id = self.chained_generator.get_max_token()
+            chained_id = self.chained_generator.get_current_token()
 
             self._unfinished_ids.append((next_id, chained_id))
 
@@ -151,7 +174,7 @@ class ChainedIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
@@ -160,4 +183,4 @@ class ChainedIdGenerator(object):
                 stream_id, chained_id = self._unfinished_ids[0]
                 return (stream_id - 1, chained_id)
 
-            return (self._current_max, self.chained_generator.get_max_token())
+            return (self._current_max, self.chained_generator.get_current_token())
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 640fae3890..cd4d90f3cf 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -16,7 +16,8 @@
 
 from twisted.internet import defer, reactor
 
-from .logcontext import PreserveLoggingContext
+from .logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util import unwrapFirstError
 
 
 @defer.inlineCallbacks
@@ -107,3 +108,32 @@ class ObservableDeferred(object):
         return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
             id(self), self._result, self._deferred,
         )
+
+
+def concurrently_execute(func, args, limit):
+    """Executes the function with each argument conncurrently while limiting
+    the number of concurrent executions.
+
+    Args:
+        func (func): Function to execute, should return a deferred.
+        args (list): List of arguments to pass to func, each invocation of func
+            gets a signle argument.
+        limit (int): Maximum number of conccurent executions.
+
+    Returns:
+        deferred: Resolved when all function invocations have finished.
+    """
+    it = iter(args)
+
+    @defer.inlineCallbacks
+    def _concurrently_execute_inner():
+        try:
+            while True:
+                yield func(it.next())
+        except StopIteration:
+            pass
+
+    return defer.gatherResults([
+        preserve_fn(_concurrently_execute_inner)()
+        for _ in xrange(limit)
+    ], consumeErrors=True).addErrback(unwrapFirstError)