summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2020-09-04 10:19:42 +0100
committerGitHub <noreply@github.com>2020-09-04 10:19:42 +0100
commit9f8abdcc3828e942f09cc62e29dff93dbaa01ec7 (patch)
tree5811a075386a883af3e9da43c6123beb812d70c4
parentAdd type hints to more handlers (#8244) (diff)
downloadsynapse-9f8abdcc3828e942f09cc62e29dff93dbaa01ec7.tar.xz
Revert "Add experimental support for sharding event persister. (#8170)" (#8242)
* Revert "Add experimental support for sharding event persister. (#8170)"

This reverts commit 82c1ee1c22a87b9e6e3179947014b0f11c0a1ac3.

* Changelog
-rw-r--r--changelog.d/8170.feature1
-rw-r--r--changelog.d/8242.feature1
-rw-r--r--synapse/config/_base.py21
-rw-r--r--synapse/config/_base.pyi1
-rw-r--r--synapse/config/workers.py37
-rw-r--r--synapse/handlers/federation.py44
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/room.py14
-rw-r--r--synapse/handlers/room_member.py7
-rw-r--r--synapse/replication/http/federation.py12
-rw-r--r--synapse/replication/tcp/handler.py2
-rw-r--r--synapse/replication/tcp/streams/events.py4
-rw-r--r--synapse/storage/databases/__init__.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py2
-rw-r--r--synapse/storage/databases/main/events.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py66
-rw-r--r--synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql16
-rw-r--r--synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres26
-rw-r--r--synapse/storage/util/id_generators.py10
19 files changed, 78 insertions, 206 deletions
diff --git a/changelog.d/8170.feature b/changelog.d/8170.feature
deleted file mode 100644
index b363e929ea..0000000000
--- a/changelog.d/8170.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for sharding event persister.
diff --git a/changelog.d/8242.feature b/changelog.d/8242.feature
new file mode 100644
index 0000000000..f6891e360d
--- /dev/null
+++ b/changelog.d/8242.feature
@@ -0,0 +1 @@
+Back out experimental support for sharding event persister. **PLEASE REMOVE THIS LINE FROM THE FINAL CHANGELOG**
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 73f0717b0d..1417487427 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -832,26 +832,11 @@ class ShardedWorkerHandlingConfig:
     def should_handle(self, instance_name: str, key: str) -> bool:
         """Whether this instance is responsible for handling the given key.
         """
-        # If multiple instances are not defined we always return true
+
+        # If multiple instances are not defined we always return true.
         if not self.instances or len(self.instances) == 1:
             return True
 
-        return self.get_instance(key) == instance_name
-
-    def get_instance(self, key: str) -> str:
-        """Get the instance responsible for handling the given key.
-
-        Note: For things like federation sending the config for which instance
-        is sending is known only to the sender instance if there is only one.
-        Therefore `should_handle` should be used where possible.
-        """
-
-        if not self.instances:
-            return "master"
-
-        if len(self.instances) == 1:
-            return self.instances[0]
-
         # We shard by taking the hash, modulo it by the number of instances and
         # then checking whether this instance matches the instance at that
         # index.
@@ -861,7 +846,7 @@ class ShardedWorkerHandlingConfig:
         dest_hash = sha256(key.encode("utf8")).digest()
         dest_int = int.from_bytes(dest_hash, byteorder="little")
         remainder = dest_int % (len(self.instances))
-        return self.instances[remainder]
+        return self.instances[remainder] == instance_name
 
 
 __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index b8faafa9bd..eb911e8f9f 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -142,4 +142,3 @@ class ShardedWorkerHandlingConfig:
     instances: List[str]
     def __init__(self, instances: List[str]) -> None: ...
     def should_handle(self, instance_name: str, key: str) -> bool: ...
-    def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index f23e42cdf9..c784a71508 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,24 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import List, Union
-
 import attr
 
 from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
 from .server import ListenerConfig, parse_listener_def
 
 
-def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
-    """Helper for allowing parsing a string or list of strings to a config
-    option expecting a list of strings.
-    """
-
-    if isinstance(obj, str):
-        return [obj]
-    return obj
-
-
 @attr.s
 class InstanceLocationConfig:
     """The host and port to talk to an instance via HTTP replication.
@@ -45,13 +33,11 @@ class WriterLocations:
     """Specifies the instances that write various streams.
 
     Attributes:
-        events: The instances that write to the event and backfill streams.
-        typing: The instance that writes to the typing stream.
+        events: The instance that writes to the event and backfill streams.
+        events: The instance that writes to the typing stream.
     """
 
-    events = attr.ib(
-        default=["master"], type=List[str], converter=_instance_to_list_converter
-    )
+    events = attr.ib(default="master", type=str)
     typing = attr.ib(default="master", type=str)
 
 
@@ -119,18 +105,15 @@ class WorkerConfig(Config):
         writers = config.get("stream_writers") or {}
         self.writers = WriterLocations(**writers)
 
-        # Check that the configured writers for events and typing also appears in
+        # Check that the configured writer for events and typing also appears in
         # `instance_map`.
         for stream in ("events", "typing"):
-            instances = _instance_to_list_converter(getattr(self.writers, stream))
-            for instance in instances:
-                if instance != "master" and instance not in self.instance_map:
-                    raise ConfigError(
-                        "Instance %r is configured to write %s but does not appear in `instance_map` config."
-                        % (instance, stream)
-                    )
-
-        self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
+            instance = getattr(self.writers, stream)
+            if instance != "master" and instance not in self.instance_map:
+                raise ConfigError(
+                    "Instance %r is configured to write %s but does not appear in `instance_map` config."
+                    % (instance, stream)
+                )
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         return """\
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 310c7f7138..43f2986f89 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -923,8 +923,7 @@ class FederationHandler(BaseHandler):
                 )
             )
 
-        if ev_infos:
-            await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
+        await self._handle_new_events(dest, ev_infos, backfilled=True)
 
         # Step 2: Persist the rest of the events in the chunk one by one
         events.sort(key=lambda e: e.depth)
@@ -1217,7 +1216,7 @@ class FederationHandler(BaseHandler):
             event_infos.append(_NewEventInfo(event, None, auth))
 
         await self._handle_new_events(
-            destination, room_id, event_infos,
+            destination, event_infos,
         )
 
     def _sanity_check_event(self, ev):
@@ -1364,15 +1363,15 @@ class FederationHandler(BaseHandler):
             )
 
             max_stream_id = await self._persist_auth_tree(
-                origin, room_id, auth_chain, state, event, room_version_obj
+                origin, auth_chain, state, event, room_version_obj
             )
 
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
+            #
+            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.config.worker.events_shard_config.get_instance(room_id),
-                "events",
-                max_stream_id,
+                self.config.worker.writers.events, "events", max_stream_id
             )
 
             # Check whether this room is the result of an upgrade of a room we already know
@@ -1626,7 +1625,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = await self.state_handler.compute_event_context(event)
-        await self.persist_events_and_notify(event.room_id, [(event, context)])
+        await self.persist_events_and_notify([(event, context)])
 
         return event
 
@@ -1653,9 +1652,7 @@ class FederationHandler(BaseHandler):
         await self.federation_client.send_leave(host_list, event)
 
         context = await self.state_handler.compute_event_context(event)
-        stream_id = await self.persist_events_and_notify(
-            event.room_id, [(event, context)]
-        )
+        stream_id = await self.persist_events_and_notify([(event, context)])
 
         return event, stream_id
 
@@ -1903,7 +1900,7 @@ class FederationHandler(BaseHandler):
                 )
 
             await self.persist_events_and_notify(
-                event.room_id, [(event, context)], backfilled=backfilled
+                [(event, context)], backfilled=backfilled
             )
         except Exception:
             run_in_background(
@@ -1916,7 +1913,6 @@ class FederationHandler(BaseHandler):
     async def _handle_new_events(
         self,
         origin: str,
-        room_id: str,
         event_infos: Iterable[_NewEventInfo],
         backfilled: bool = False,
     ) -> None:
@@ -1948,7 +1944,6 @@ class FederationHandler(BaseHandler):
         )
 
         await self.persist_events_and_notify(
-            room_id,
             [
                 (ev_info.event, context)
                 for ev_info, context in zip(event_infos, contexts)
@@ -1959,7 +1954,6 @@ class FederationHandler(BaseHandler):
     async def _persist_auth_tree(
         self,
         origin: str,
-        room_id: str,
         auth_events: List[EventBase],
         state: List[EventBase],
         event: EventBase,
@@ -1974,7 +1968,6 @@ class FederationHandler(BaseHandler):
 
         Args:
             origin: Where the events came from
-            room_id,
             auth_events
             state
             event
@@ -2049,20 +2042,17 @@ class FederationHandler(BaseHandler):
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
         await self.persist_events_and_notify(
-            room_id,
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
-            ],
+            ]
         )
 
         new_event_context = await self.state_handler.compute_event_context(
             event, old_state=state
         )
 
-        return await self.persist_events_and_notify(
-            room_id, [(event, new_event_context)]
-        )
+        return await self.persist_events_and_notify([(event, new_event_context)])
 
     async def _prep_event(
         self,
@@ -2913,7 +2903,6 @@ class FederationHandler(BaseHandler):
 
     async def persist_events_and_notify(
         self,
-        room_id: str,
         event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
         backfilled: bool = False,
     ) -> int:
@@ -2921,19 +2910,14 @@ class FederationHandler(BaseHandler):
         necessary.
 
         Args:
-            room_id: The room ID of events being persisted.
-            event_and_contexts: Sequence of events with their associated
-                context that should be persisted. All events must belong to
-                the same room.
+            event_and_contexts:
             backfilled: Whether these events are a result of
                 backfilling or not
         """
-        instance = self.config.worker.events_shard_config.get_instance(room_id)
-        if instance != self._instance_name:
+        if self.config.worker.writers.events != self._instance_name:
             result = await self._send_events(
-                instance_name=instance,
+                instance_name=self.config.worker.writers.events,
                 store=self.store,
-                room_id=room_id,
                 event_and_contexts=event_and_contexts,
                 backfilled=backfilled,
             )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6398774c02..72bb638167 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -376,8 +376,9 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
         self.require_membership_for_aliases = hs.config.require_membership_for_aliases
-        self._events_shard_config = self.config.worker.events_shard_config
-        self._instance_name = hs.get_instance_name()
+        self._is_event_writer = (
+            self.config.worker.writers.events == hs.get_instance_name()
+        )
 
         self.room_invite_state_types = self.hs.config.room_invite_state_types
 
@@ -903,10 +904,9 @@ class EventCreationHandler(object):
 
         try:
             # If we're a worker we need to hit out to the master.
-            writer_instance = self._events_shard_config.get_instance(event.room_id)
-            if writer_instance != self._instance_name:
+            if not self._is_event_writer:
                 result = await self.send_event(
-                    instance_name=writer_instance,
+                    instance_name=self.config.worker.writers.events,
                     event_id=event.event_id,
                     store=self.store,
                     requester=requester,
@@ -974,9 +974,7 @@ class EventCreationHandler(object):
 
         This should only be run on the instance in charge of persisting events.
         """
-        assert self._events_shard_config.should_handle(
-            self._instance_name, event.room_id
-        )
+        assert self._is_event_writer
 
         if ratelimit:
             # We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 55794c3057..9d5b1828df 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -804,9 +804,7 @@ class RoomCreationHandler(BaseHandler):
 
         # Always wait for room creation to progate before returning
         await self._replication.wait_for_stream_position(
-            self.hs.config.worker.events_shard_config.get_instance(room_id),
-            "events",
-            last_stream_id,
+            self.hs.config.worker.writers.events, "events", last_stream_id
         )
 
         return result, last_stream_id
@@ -1262,10 +1260,10 @@ class RoomShutdownHandler(object):
             # We now wait for the create room to come back in via replication so
             # that we can assume that all the joins/invites have propogated before
             # we try and auto join below.
+            #
+            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.hs.config.worker.events_shard_config.get_instance(new_room_id),
-                "events",
-                stream_id,
+                self.hs.config.worker.writers.events, "events", stream_id
             )
         else:
             new_room_id = None
@@ -1295,9 +1293,7 @@ class RoomShutdownHandler(object):
 
                 # Wait for leave to come in over replication before trying to forget.
                 await self._replication.wait_for_stream_position(
-                    self.hs.config.worker.events_shard_config.get_instance(room_id),
-                    "events",
-                    stream_id,
+                    self.hs.config.worker.writers.events, "events", stream_id
                 )
 
                 await self.room_member_handler.forget(target_requester.user, room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index e5cb3c2d5c..a7962b0ada 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -82,6 +82,13 @@ class RoomMemberHandler(object):
         self._enable_lookup = hs.config.enable_3pid_lookup
         self.allow_per_room_profiles = self.config.allow_per_room_profiles
 
+        self._event_stream_writer_instance = hs.config.worker.writers.events
+        self._is_on_event_persistence_instance = (
+            self._event_stream_writer_instance == hs.get_instance_name()
+        )
+        if self._is_on_event_persistence_instance:
+            self.persist_event_storage = hs.get_storage().persistence
+
         self._join_rate_limiter_local = Ratelimiter(
             clock=self.clock,
             rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 5c8be747e1..6b56315148 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -65,11 +65,10 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         self.federation_handler = hs.get_handlers().federation_handler
 
     @staticmethod
-    async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
+    async def _serialize_payload(store, event_and_contexts, backfilled):
         """
         Args:
             store
-            room_id (str)
             event_and_contexts (list[tuple[FrozenEvent, EventContext]])
             backfilled (bool): Whether or not the events are the result of
                 backfilling
@@ -89,11 +88,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                 }
             )
 
-        payload = {
-            "events": event_payloads,
-            "backfilled": backfilled,
-            "room_id": room_id,
-        }
+        payload = {"events": event_payloads, "backfilled": backfilled}
 
         return payload
 
@@ -101,7 +96,6 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         with Measure(self.clock, "repl_fed_send_events_parse"):
             content = parse_json_object_from_request(request)
 
-            room_id = content["room_id"]
             backfilled = content["backfilled"]
 
             event_payloads = content["events"]
@@ -126,7 +120,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         logger.info("Got %d events from federation", len(event_and_contexts))
 
         max_stream_id = await self.federation_handler.persist_events_and_notify(
-            room_id, event_and_contexts, backfilled
+            event_and_contexts, backfilled
         )
 
         return 200, {"max_stream_id": max_stream_id}
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b323841f73..1c303f3a46 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler:
             if isinstance(stream, (EventsStream, BackfillStream)):
                 # Only add EventStream and BackfillStream as a source on the
                 # instance in charge of event persistence.
-                if hs.get_instance_name() in hs.config.worker.writers.events:
+                if hs.config.worker.writers.events == hs.get_instance_name():
                     self._streams_to_replicate.append(stream)
 
                 continue
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 3705618b4f..16c63ff4ec 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type
 
 import attr
 
-from ._base import Stream, StreamUpdateResult, Token
+from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
 
 """Handling of the 'events' replication stream
 
@@ -117,7 +117,7 @@ class EventsStream(Stream):
         self._store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
-            self._store._stream_id_gen.get_current_token_for_writer,
+            current_token_without_instance(self._store.get_current_events_token),
             self._update_function,
         )
 
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index c73d54fb67..0ac854aee2 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -68,7 +68,7 @@ class Databases(object):
 
                     # If we're on a process that can persist events also
                     # instantiate a `PersistEventsStore`
-                    if hs.get_instance_name() in hs.config.worker.writers.events:
+                    if hs.config.worker.writers.events == hs.get_instance_name():
                         persist_events = PersistEventsStore(hs, database, main)
 
                 if "state" in database_config.databases:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 4c3c162acf..0b69aa6a94 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         """
 
         if stream_ordering <= self.stream_ordering_month_ago:
-            raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
+            raise StoreError(400, "stream_ordering too old")
 
         sql = """
                 SELECT event_id FROM stream_ordering_to_exterm
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b94fe7ac17..b3d27a2ee7 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -97,7 +97,6 @@ class PersistEventsStore:
         self.store = main_data_store
         self.database_engine = db.engine
         self._clock = hs.get_clock()
-        self._instance_name = hs.get_instance_name()
 
         self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
         self.is_mine_id = hs.is_mine_id
@@ -109,7 +108,7 @@ class PersistEventsStore:
 
         # This should only exist on instances that are configured to write
         assert (
-            hs.get_instance_name() in hs.config.worker.writers.events
+            hs.config.worker.writers.events == hs.get_instance_name()
         ), "Can only instantiate EventsStore on master"
 
     async def _persist_events_and_state_updates(
@@ -801,7 +800,6 @@ class PersistEventsStore:
             table="events",
             values=[
                 {
-                    "instance_name": self._instance_name,
                     "stream_ordering": event.internal_metadata.stream_ordering,
                     "topological_ordering": event.depth,
                     "depth": event.depth,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 17f5997b89..a7a73cc3d8 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -42,8 +42,7 @@ from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
-from synapse.storage.engines import PostgresEngine
-from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
+from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import Collection, get_domain_from_id
 from synapse.util.caches.descriptors import Cache, cached
 from synapse.util.iterutils import batch_iter
@@ -79,54 +78,27 @@ class EventsWorkerStore(SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super(EventsWorkerStore, self).__init__(database, db_conn, hs)
 
-        if isinstance(database.engine, PostgresEngine):
-            # If we're using Postgres than we can use `MultiWriterIdGenerator`
-            # regardless of whether this process writes to the streams or not.
-            self._stream_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                instance_name=hs.get_instance_name(),
-                table="events",
-                instance_column="instance_name",
-                id_column="stream_ordering",
-                sequence_name="events_stream_seq",
+        if hs.config.worker.writers.events == hs.get_instance_name():
+            # We are the process in charge of generating stream ids for events,
+            # so instantiate ID generators based on the database
+            self._stream_id_gen = StreamIdGenerator(
+                db_conn, "events", "stream_ordering",
             )
-            self._backfill_id_gen = MultiWriterIdGenerator(
-                db_conn=db_conn,
-                db=database,
-                instance_name=hs.get_instance_name(),
-                table="events",
-                instance_column="instance_name",
-                id_column="stream_ordering",
-                sequence_name="events_backfill_stream_seq",
-                positive=False,
+            self._backfill_id_gen = StreamIdGenerator(
+                db_conn,
+                "events",
+                "stream_ordering",
+                step=-1,
+                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
             )
         else:
-            # We shouldn't be running in worker mode with SQLite, but its useful
-            # to support it for unit tests.
-            #
-            # If this process is the writer than we need to use
-            # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
-            # updated over replication. (Multiple writers are not supported for
-            # SQLite).
-            if hs.get_instance_name() in hs.config.worker.writers.events:
-                self._stream_id_gen = StreamIdGenerator(
-                    db_conn, "events", "stream_ordering",
-                )
-                self._backfill_id_gen = StreamIdGenerator(
-                    db_conn,
-                    "events",
-                    "stream_ordering",
-                    step=-1,
-                    extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
-                )
-            else:
-                self._stream_id_gen = SlavedIdTracker(
-                    db_conn, "events", "stream_ordering"
-                )
-                self._backfill_id_gen = SlavedIdTracker(
-                    db_conn, "events", "stream_ordering", step=-1
-                )
+            # Another process is in charge of persisting events and generating
+            # stream IDs: rely on the replication streams to let us know which
+            # IDs we can process.
+            self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
+            self._backfill_id_gen = SlavedIdTracker(
+                db_conn, "events", "stream_ordering", step=-1
+            )
 
         self._get_event_cache = Cache(
             "*getEvent*",
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
deleted file mode 100644
index 98ff76d709..0000000000
--- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
+++ /dev/null
@@ -1,16 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-ALTER TABLE events ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
deleted file mode 100644
index 97c1e6a0c5..0000000000
--- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
+++ /dev/null
@@ -1,26 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
-
-SELECT setval('events_stream_seq', (
-    SELECT COALESCE(MAX(stream_ordering), 1) FROM events
-));
-
-CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
-
-SELECT setval('events_backfill_stream_seq', (
-    SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
-));
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 8fd21c2bf8..9f3d23f0a5 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -231,12 +231,8 @@ class MultiWriterIdGenerator:
         # gaps should be relatively rare it's still worth doing the book keeping
         # that allows us to skip forwards when there are gapless runs of
         # positions.
-        #
-        # We start at 1 here as a) the first generated stream ID will be 2, and
-        # b) other parts of the code assume that stream IDs are strictly greater
-        # than 0.
         self._persisted_upto_position = (
-            min(self._current_positions.values()) if self._current_positions else 1
+            min(self._current_positions.values()) if self._current_positions else 0
         )
         self._known_persisted_positions = []  # type: List[int]
 
@@ -366,7 +362,9 @@ class MultiWriterIdGenerator:
         equal to it have been successfully persisted.
         """
 
-        return self.get_persisted_upto_position()
+        # Currently we don't support this operation, as it's not obvious how to
+        # condense the stream positions of multiple writers into a single int.
+        raise NotImplementedError()
 
     def get_current_token_for_writer(self, instance_name: str) -> int:
         """Returns the position of the given writer.