summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/_base.py21
-rw-r--r--synapse/config/_base.pyi1
-rw-r--r--synapse/config/workers.py37
-rw-r--r--synapse/handlers/federation.py44
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/room.py14
-rw-r--r--synapse/handlers/room_member.py7
-rw-r--r--synapse/replication/http/federation.py12
-rw-r--r--synapse/replication/tcp/handler.py2
-rw-r--r--synapse/replication/tcp/streams/events.py4
-rw-r--r--synapse/storage/databases/__init__.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py2
-rw-r--r--synapse/storage/databases/main/events.py12
-rw-r--r--synapse/storage/databases/main/events_worker.py66
-rw-r--r--synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql16
-rw-r--r--synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres26
-rw-r--r--synapse/storage/util/id_generators.py10
17 files changed, 210 insertions, 80 deletions
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index ad5ab6ad62..bb9bf8598d 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -832,11 +832,26 @@ class ShardedWorkerHandlingConfig:
     def should_handle(self, instance_name: str, key: str) -> bool:
         """Whether this instance is responsible for handling the given key.
         """
-
-        # If multiple instances are not defined we always return true.
+        # If multiple instances are not defined we always return true
         if not self.instances or len(self.instances) == 1:
             return True
 
+        return self.get_instance(key) == instance_name
+
+    def get_instance(self, key: str) -> str:
+        """Get the instance responsible for handling the given key.
+
+        Note: For things like federation sending the config for which instance
+        is sending is known only to the sender instance if there is only one.
+        Therefore `should_handle` should be used where possible.
+        """
+
+        if not self.instances:
+            return "master"
+
+        if len(self.instances) == 1:
+            return self.instances[0]
+
         # We shard by taking the hash, modulo it by the number of instances and
         # then checking whether this instance matches the instance at that
         # index.
@@ -846,7 +861,7 @@ class ShardedWorkerHandlingConfig:
         dest_hash = sha256(key.encode("utf8")).digest()
         dest_int = int.from_bytes(dest_hash, byteorder="little")
         remainder = dest_int % (len(self.instances))
-        return self.instances[remainder] == instance_name
+        return self.instances[remainder]
 
 
 __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index eb911e8f9f..b8faafa9bd 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
     instances: List[str]
     def __init__(self, instances: List[str]) -> None: ...
     def should_handle(self, instance_name: str, key: str) -> bool: ...
+    def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index c784a71508..f23e42cdf9 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,12 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Union
+
 import attr
 
 from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
 from .server import ListenerConfig, parse_listener_def
 
 
+def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
+    """Helper for allowing parsing a string or list of strings to a config
+    option expecting a list of strings.
+    """
+
+    if isinstance(obj, str):
+        return [obj]
+    return obj
+
+
 @attr.s
 class InstanceLocationConfig:
     """The host and port to talk to an instance via HTTP replication.
@@ -33,11 +45,13 @@ class WriterLocations:
     """Specifies the instances that write various streams.
 
     Attributes:
-        events: The instance that writes to the event and backfill streams.
-        events: The instance that writes to the typing stream.
+        events: The instances that write to the event and backfill streams.
+        typing: The instance that writes to the typing stream.
     """
 
-    events = attr.ib(default="master", type=str)
+    events = attr.ib(
+        default=["master"], type=List[str], converter=_instance_to_list_converter
+    )
     typing = attr.ib(default="master", type=str)
 
 
@@ -105,15 +119,18 @@ class WorkerConfig(Config):
         writers = config.get("stream_writers") or {}
         self.writers = WriterLocations(**writers)
 
-        # Check that the configured writer for events and typing also appears in
+        # Check that the configured writers for events and typing also appears in
         # `instance_map`.
         for stream in ("events", "typing"):
-            instance = getattr(self.writers, stream)
-            if instance != "master" and instance not in self.instance_map:
-                raise ConfigError(
-                    "Instance %r is configured to write %s but does not appear in `instance_map` config."
-                    % (instance, stream)
-                )
+            instances = _instance_to_list_converter(getattr(self.writers, stream))
+            for instance in instances:
+                if instance != "master" and instance not in self.instance_map:
+                    raise ConfigError(
+                        "Instance %r is configured to write %s but does not appear in `instance_map` config."
+                        % (instance, stream)
+                    )
+
+        self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         return """\
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c195eba830..a5734bebab 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -896,7 +896,8 @@ class FederationHandler(BaseHandler):
                 )
             )
 
-        await self._handle_new_events(dest, ev_infos, backfilled=True)
+        if ev_infos:
+            await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
 
         # Step 2: Persist the rest of the events in the chunk one by one
         events.sort(key=lambda e: e.depth)
@@ -1189,7 +1190,7 @@ class FederationHandler(BaseHandler):
             event_infos.append(_NewEventInfo(event, None, auth))
 
         await self._handle_new_events(
-            destination, event_infos,
+            destination, room_id, event_infos,
         )
 
     def _sanity_check_event(self, ev):
@@ -1336,15 +1337,15 @@ class FederationHandler(BaseHandler):
             )
 
             max_stream_id = await self._persist_auth_tree(
-                origin, auth_chain, state, event, room_version_obj
+                origin, room_id, auth_chain, state, event, room_version_obj
             )
 
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
-            #
-            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.config.worker.writers.events, "events", max_stream_id
+                self.config.worker.events_shard_config.get_instance(room_id),
+                "events",
+                max_stream_id,
             )
 
             # Check whether this room is the result of an upgrade of a room we already know
@@ -1593,7 +1594,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = await self.state_handler.compute_event_context(event)
-        await self.persist_events_and_notify([(event, context)])
+        await self.persist_events_and_notify(event.room_id, [(event, context)])
 
         return event
 
@@ -1620,7 +1621,9 @@ class FederationHandler(BaseHandler):
         await self.federation_client.send_leave(host_list, event)
 
         context = await self.state_handler.compute_event_context(event)
-        stream_id = await self.persist_events_and_notify([(event, context)])
+        stream_id = await self.persist_events_and_notify(
+            event.room_id, [(event, context)]
+        )
 
         return event, stream_id
 
@@ -1868,7 +1871,7 @@ class FederationHandler(BaseHandler):
                 )
 
             await self.persist_events_and_notify(
-                [(event, context)], backfilled=backfilled
+                event.room_id, [(event, context)], backfilled=backfilled
             )
         except Exception:
             run_in_background(
@@ -1881,6 +1884,7 @@ class FederationHandler(BaseHandler):
     async def _handle_new_events(
         self,
         origin: str,
+        room_id: str,
         event_infos: Iterable[_NewEventInfo],
         backfilled: bool = False,
     ) -> None:
@@ -1912,6 +1916,7 @@ class FederationHandler(BaseHandler):
         )
 
         await self.persist_events_and_notify(
+            room_id,
             [
                 (ev_info.event, context)
                 for ev_info, context in zip(event_infos, contexts)
@@ -1922,6 +1927,7 @@ class FederationHandler(BaseHandler):
     async def _persist_auth_tree(
         self,
         origin: str,
+        room_id: str,
         auth_events: List[EventBase],
         state: List[EventBase],
         event: EventBase,
@@ -1936,6 +1942,7 @@ class FederationHandler(BaseHandler):
 
         Args:
             origin: Where the events came from
+            room_id,
             auth_events
             state
             event
@@ -2010,17 +2017,20 @@ class FederationHandler(BaseHandler):
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
         await self.persist_events_and_notify(
+            room_id,
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
-            ]
+            ],
         )
 
         new_event_context = await self.state_handler.compute_event_context(
             event, old_state=state
         )
 
-        return await self.persist_events_and_notify([(event, new_event_context)])
+        return await self.persist_events_and_notify(
+            room_id, [(event, new_event_context)]
+        )
 
     async def _prep_event(
         self,
@@ -2871,6 +2881,7 @@ class FederationHandler(BaseHandler):
 
     async def persist_events_and_notify(
         self,
+        room_id: str,
         event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
         backfilled: bool = False,
     ) -> int:
@@ -2878,14 +2889,19 @@ class FederationHandler(BaseHandler):
         necessary.
 
         Args:
-            event_and_contexts:
+            room_id: The room ID of events being persisted.
+            event_and_contexts: Sequence of events with their associated
+                context that should be persisted. All events must belong to
+                the same room.
             backfilled: Whether these events are a result of
                 backfilling or not
         """
-        if self.config.worker.writers.events != self._instance_name:
+        instance = self.config.worker.events_shard_config.get_instance(room_id)
+        if instance != self._instance_name:
             result = await self._send_events(
-                instance_name=self.config.worker.writers.events,
+                instance_name=instance,
                 store=self.store,
+                room_id=room_id,
                 event_and_contexts=event_and_contexts,
                 backfilled=backfilled,
             )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e54e2b322b..a8fe5cf4e2 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -376,9 +376,8 @@ class EventCreationHandler:
         self.notifier = hs.get_notifier()
         self.config = hs.config
         self.require_membership_for_aliases = hs.config.require_membership_for_aliases
-        self._is_event_writer = (
-            self.config.worker.writers.events == hs.get_instance_name()
-        )
+        self._events_shard_config = self.config.worker.events_shard_config
+        self._instance_name = hs.get_instance_name()
 
         self.room_invite_state_types = self.hs.config.room_invite_state_types
 
@@ -902,9 +901,10 @@ class EventCreationHandler:
 
         try:
             # If we're a worker we need to hit out to the master.
-            if not self._is_event_writer:
+            writer_instance = self._events_shard_config.get_instance(event.room_id)
+            if writer_instance != self._instance_name:
                 result = await self.send_event(
-                    instance_name=self.config.worker.writers.events,
+                    instance_name=writer_instance,
                     event_id=event.event_id,
                     store=self.store,
                     requester=requester,
@@ -972,8 +972,10 @@ class EventCreationHandler:
 
         This should only be run on the instance in charge of persisting events.
         """
-        assert self._is_event_writer
         assert self.storage.persistence is not None
+        assert self._events_shard_config.should_handle(
+            self._instance_name, event.room_id
+        )
 
         if ratelimit:
             # We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 53d85ab97d..eeade6ad3f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
 
         # Always wait for room creation to progate before returning
         await self._replication.wait_for_stream_position(
-            self.hs.config.worker.writers.events, "events", last_stream_id
+            self.hs.config.worker.events_shard_config.get_instance(room_id),
+            "events",
+            last_stream_id,
         )
 
         return result, last_stream_id
@@ -1259,10 +1261,10 @@ class RoomShutdownHandler:
             # We now wait for the create room to come back in via replication so
             # that we can assume that all the joins/invites have propogated before
             # we try and auto join below.
-            #
-            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.hs.config.worker.writers.events, "events", stream_id
+                self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+                "events",
+                stream_id,
             )
         else:
             new_room_id = None
@@ -1292,7 +1294,9 @@ class RoomShutdownHandler:
 
                 # Wait for leave to come in over replication before trying to forget.
                 await self._replication.wait_for_stream_position(
-                    self.hs.config.worker.writers.events, "events", stream_id
+                    self.hs.config.worker.events_shard_config.get_instance(room_id),
+                    "events",
+                    stream_id,
                 )
 
                 await self.room_member_handler.forget(target_requester.user, room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 100f335b80..01a6e88262 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -82,13 +82,6 @@ class RoomMemberHandler:
         self._enable_lookup = hs.config.enable_3pid_lookup
         self.allow_per_room_profiles = self.config.allow_per_room_profiles
 
-        self._event_stream_writer_instance = hs.config.worker.writers.events
-        self._is_on_event_persistence_instance = (
-            self._event_stream_writer_instance == hs.get_instance_name()
-        )
-        if self._is_on_event_persistence_instance:
-            self.persist_event_storage = hs.get_storage().persistence
-
         self._join_rate_limiter_local = Ratelimiter(
             clock=self.clock,
             rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 6b56315148..5c8be747e1 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -65,10 +65,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         self.federation_handler = hs.get_handlers().federation_handler
 
     @staticmethod
-    async def _serialize_payload(store, event_and_contexts, backfilled):
+    async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
         """
         Args:
             store
+            room_id (str)
             event_and_contexts (list[tuple[FrozenEvent, EventContext]])
             backfilled (bool): Whether or not the events are the result of
                 backfilling
@@ -88,7 +89,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                 }
             )
 
-        payload = {"events": event_payloads, "backfilled": backfilled}
+        payload = {
+            "events": event_payloads,
+            "backfilled": backfilled,
+            "room_id": room_id,
+        }
 
         return payload
 
@@ -96,6 +101,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         with Measure(self.clock, "repl_fed_send_events_parse"):
             content = parse_json_object_from_request(request)
 
+            room_id = content["room_id"]
             backfilled = content["backfilled"]
 
             event_payloads = content["events"]
@@ -120,7 +126,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         logger.info("Got %d events from federation", len(event_and_contexts))
 
         max_stream_id = await self.federation_handler.persist_events_and_notify(
-            event_and_contexts, backfilled
+            room_id, event_and_contexts, backfilled
         )
 
         return 200, {"max_stream_id": max_stream_id}
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1c303f3a46..b323841f73 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler:
             if isinstance(stream, (EventsStream, BackfillStream)):
                 # Only add EventStream and BackfillStream as a source on the
                 # instance in charge of event persistence.
-                if hs.config.worker.writers.events == hs.get_instance_name():
+                if hs.get_instance_name() in hs.config.worker.writers.events:
                     self._streams_to_replicate.append(stream)
 
                 continue
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f929fc3954..ccc7ca30d8 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type
 
 import attr
 
-from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
+from ._base import Stream, StreamUpdateResult, Token
 
 """Handling of the 'events' replication stream
 
@@ -117,7 +117,7 @@ class EventsStream(Stream):
         self._store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
-            current_token_without_instance(self._store.get_current_events_token),
+            self._store._stream_id_gen.get_current_token_for_writer,
             self._update_function,
         )
 
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 985b12df91..aa5d490624 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -75,7 +75,7 @@ class Databases:
 
                     # If we're on a process that can persist events also
                     # instantiate a `PersistEventsStore`
-                    if hs.config.worker.writers.events == hs.get_instance_name():
+                    if hs.get_instance_name() in hs.config.worker.writers.events:
                         persist_events = PersistEventsStore(hs, database, main)
 
                 if "state" in database_config.databases:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 0b69aa6a94..4c3c162acf 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         """
 
         if stream_ordering <= self.stream_ordering_month_ago:
-            raise StoreError(400, "stream_ordering too old")
+            raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
 
         sql = """
                 SELECT event_id FROM stream_ordering_to_exterm
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 9cd1403b38..9a80f419e3 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -32,7 +32,7 @@ from synapse.logging.utils import log_function
 from synapse.storage._base import db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.search import SearchEntry
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.util.id_generators import MultiWriterIdGenerator
 from synapse.types import StateMap, get_domain_from_id
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.iterutils import batch_iter
@@ -97,18 +97,21 @@ class PersistEventsStore:
         self.store = main_data_store
         self.database_engine = db.engine
         self._clock = hs.get_clock()
+        self._instance_name = hs.get_instance_name()
 
         self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
         self.is_mine_id = hs.is_mine_id
 
         # Ideally we'd move these ID gens here, unfortunately some other ID
         # generators are chained off them so doing so is a bit of a PITA.
-        self._backfill_id_gen = self.store._backfill_id_gen  # type: StreamIdGenerator
-        self._stream_id_gen = self.store._stream_id_gen  # type: StreamIdGenerator
+        self._backfill_id_gen = (
+            self.store._backfill_id_gen
+        )  # type: MultiWriterIdGenerator
+        self._stream_id_gen = self.store._stream_id_gen  # type: MultiWriterIdGenerator
 
         # This should only exist on instances that are configured to write
         assert (
-            hs.config.worker.writers.events == hs.get_instance_name()
+            hs.get_instance_name() in hs.config.worker.writers.events
         ), "Can only instantiate EventsStore on master"
 
     async def _persist_events_and_state_updates(
@@ -809,6 +812,7 @@ class PersistEventsStore:
             table="events",
             values=[
                 {
+                    "instance_name": self._instance_name,
                     "stream_ordering": event.internal_metadata.stream_ordering,
                     "topological_ordering": event.depth,
                     "depth": event.depth,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a7a73cc3d8..17f5997b89 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -42,7 +42,8 @@ from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.types import Collection, get_domain_from_id
 from synapse.util.caches.descriptors import Cache, cached
 from synapse.util.iterutils import batch_iter
@@ -78,27 +79,54 @@ class EventsWorkerStore(SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super(EventsWorkerStore, self).__init__(database, db_conn, hs)
 
-        if hs.config.worker.writers.events == hs.get_instance_name():
-            # We are the process in charge of generating stream ids for events,
-            # so instantiate ID generators based on the database
-            self._stream_id_gen = StreamIdGenerator(
-                db_conn, "events", "stream_ordering",
+        if isinstance(database.engine, PostgresEngine):
+            # If we're using Postgres than we can use `MultiWriterIdGenerator`
+            # regardless of whether this process writes to the streams or not.
+            self._stream_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                instance_name=hs.get_instance_name(),
+                table="events",
+                instance_column="instance_name",
+                id_column="stream_ordering",
+                sequence_name="events_stream_seq",
             )
-            self._backfill_id_gen = StreamIdGenerator(
-                db_conn,
-                "events",
-                "stream_ordering",
-                step=-1,
-                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+            self._backfill_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                instance_name=hs.get_instance_name(),
+                table="events",
+                instance_column="instance_name",
+                id_column="stream_ordering",
+                sequence_name="events_backfill_stream_seq",
+                positive=False,
             )
         else:
-            # Another process is in charge of persisting events and generating
-            # stream IDs: rely on the replication streams to let us know which
-            # IDs we can process.
-            self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
-            self._backfill_id_gen = SlavedIdTracker(
-                db_conn, "events", "stream_ordering", step=-1
-            )
+            # We shouldn't be running in worker mode with SQLite, but its useful
+            # to support it for unit tests.
+            #
+            # If this process is the writer than we need to use
+            # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
+            # updated over replication. (Multiple writers are not supported for
+            # SQLite).
+            if hs.get_instance_name() in hs.config.worker.writers.events:
+                self._stream_id_gen = StreamIdGenerator(
+                    db_conn, "events", "stream_ordering",
+                )
+                self._backfill_id_gen = StreamIdGenerator(
+                    db_conn,
+                    "events",
+                    "stream_ordering",
+                    step=-1,
+                    extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+                )
+            else:
+                self._stream_id_gen = SlavedIdTracker(
+                    db_conn, "events", "stream_ordering"
+                )
+                self._backfill_id_gen = SlavedIdTracker(
+                    db_conn, "events", "stream_ordering", step=-1
+                )
 
         self._get_event_cache = Cache(
             "*getEvent*",
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
new file mode 100644
index 0000000000..98ff76d709
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
@@ -0,0 +1,16 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
new file mode 100644
index 0000000000..97c1e6a0c5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
@@ -0,0 +1,26 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
+
+SELECT setval('events_stream_seq', (
+    SELECT COALESCE(MAX(stream_ordering), 1) FROM events
+));
+
+CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
+
+SELECT setval('events_backfill_stream_seq', (
+    SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
+));
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 2a66b3ad4e..1de2b91587 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -240,8 +240,12 @@ class MultiWriterIdGenerator:
         # gaps should be relatively rare it's still worth doing the book keeping
         # that allows us to skip forwards when there are gapless runs of
         # positions.
+        #
+        # We start at 1 here as a) the first generated stream ID will be 2, and
+        # b) other parts of the code assume that stream IDs are strictly greater
+        # than 0.
         self._persisted_upto_position = (
-            min(self._current_positions.values()) if self._current_positions else 0
+            min(self._current_positions.values()) if self._current_positions else 1
         )
         self._known_persisted_positions = []  # type: List[int]
 
@@ -398,9 +402,7 @@ class MultiWriterIdGenerator:
         equal to it have been successfully persisted.
         """
 
-        # Currently we don't support this operation, as it's not obvious how to
-        # condense the stream positions of multiple writers into a single int.
-        raise NotImplementedError()
+        return self.get_persisted_upto_position()
 
     def get_current_token_for_writer(self, instance_name: str) -> int:
         """Returns the position of the given writer.