diff --git a/changelog.d/8294.feature b/changelog.d/8294.feature
new file mode 100644
index 0000000000..b363e929ea
--- /dev/null
+++ b/changelog.d/8294.feature
@@ -0,0 +1 @@
+Add experimental support for sharding event persister.
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index ad5ab6ad62..bb9bf8598d 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -832,11 +832,26 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key.
"""
-
- # If multiple instances are not defined we always return true.
+ # If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True
+ return self.get_instance(key) == instance_name
+
+ def get_instance(self, key: str) -> str:
+ """Get the instance responsible for handling the given key.
+
+ Note: For things like federation sending the config for which instance
+ is sending is known only to the sender instance if there is only one.
+ Therefore `should_handle` should be used where possible.
+ """
+
+ if not self.instances:
+ return "master"
+
+ if len(self.instances) == 1:
+ return self.instances[0]
+
# We shard by taking the hash, modulo it by the number of instances and
# then checking whether this instance matches the instance at that
# index.
@@ -846,7 +861,7 @@ class ShardedWorkerHandlingConfig:
dest_hash = sha256(key.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances))
- return self.instances[remainder] == instance_name
+ return self.instances[remainder]
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index eb911e8f9f..b8faafa9bd 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
+ def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index c784a71508..f23e42cdf9 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,12 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import List, Union
+
import attr
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def
+def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
+ """Helper for allowing parsing a string or list of strings to a config
+ option expecting a list of strings.
+ """
+
+ if isinstance(obj, str):
+ return [obj]
+ return obj
+
+
@attr.s
class InstanceLocationConfig:
"""The host and port to talk to an instance via HTTP replication.
@@ -33,11 +45,13 @@ class WriterLocations:
"""Specifies the instances that write various streams.
Attributes:
- events: The instance that writes to the event and backfill streams.
- events: The instance that writes to the typing stream.
+ events: The instances that write to the event and backfill streams.
+ typing: The instance that writes to the typing stream.
"""
- events = attr.ib(default="master", type=str)
+ events = attr.ib(
+ default=["master"], type=List[str], converter=_instance_to_list_converter
+ )
typing = attr.ib(default="master", type=str)
@@ -105,15 +119,18 @@ class WorkerConfig(Config):
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)
- # Check that the configured writer for events and typing also appears in
+ # Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing"):
- instance = getattr(self.writers, stream)
- if instance != "master" and instance not in self.instance_map:
- raise ConfigError(
- "Instance %r is configured to write %s but does not appear in `instance_map` config."
- % (instance, stream)
- )
+ instances = _instance_to_list_converter(getattr(self.writers, stream))
+ for instance in instances:
+ if instance != "master" and instance not in self.instance_map:
+ raise ConfigError(
+ "Instance %r is configured to write %s but does not appear in `instance_map` config."
+ % (instance, stream)
+ )
+
+ self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c195eba830..a5734bebab 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -896,7 +896,8 @@ class FederationHandler(BaseHandler):
)
)
- await self._handle_new_events(dest, ev_infos, backfilled=True)
+ if ev_infos:
+ await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
@@ -1189,7 +1190,7 @@ class FederationHandler(BaseHandler):
event_infos.append(_NewEventInfo(event, None, auth))
await self._handle_new_events(
- destination, event_infos,
+ destination, room_id, event_infos,
)
def _sanity_check_event(self, ev):
@@ -1336,15 +1337,15 @@ class FederationHandler(BaseHandler):
)
max_stream_id = await self._persist_auth_tree(
- origin, auth_chain, state, event, room_version_obj
+ origin, room_id, auth_chain, state, event, room_version_obj
)
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
- #
- # TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.config.worker.writers.events, "events", max_stream_id
+ self.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ max_stream_id,
)
# Check whether this room is the result of an upgrade of a room we already know
@@ -1593,7 +1594,7 @@ class FederationHandler(BaseHandler):
)
context = await self.state_handler.compute_event_context(event)
- await self.persist_events_and_notify([(event, context)])
+ await self.persist_events_and_notify(event.room_id, [(event, context)])
return event
@@ -1620,7 +1621,9 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event)
- stream_id = await self.persist_events_and_notify([(event, context)])
+ stream_id = await self.persist_events_and_notify(
+ event.room_id, [(event, context)]
+ )
return event, stream_id
@@ -1868,7 +1871,7 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
- [(event, context)], backfilled=backfilled
+ event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
@@ -1881,6 +1884,7 @@ class FederationHandler(BaseHandler):
async def _handle_new_events(
self,
origin: str,
+ room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
@@ -1912,6 +1916,7 @@ class FederationHandler(BaseHandler):
)
await self.persist_events_and_notify(
+ room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
@@ -1922,6 +1927,7 @@ class FederationHandler(BaseHandler):
async def _persist_auth_tree(
self,
origin: str,
+ room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
@@ -1936,6 +1942,7 @@ class FederationHandler(BaseHandler):
Args:
origin: Where the events came from
+ room_id,
auth_events
state
event
@@ -2010,17 +2017,20 @@ class FederationHandler(BaseHandler):
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
await self.persist_events_and_notify(
+ room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
- ]
+ ],
)
new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)
- return await self.persist_events_and_notify([(event, new_event_context)])
+ return await self.persist_events_and_notify(
+ room_id, [(event, new_event_context)]
+ )
async def _prep_event(
self,
@@ -2871,6 +2881,7 @@ class FederationHandler(BaseHandler):
async def persist_events_and_notify(
self,
+ room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
@@ -2878,14 +2889,19 @@ class FederationHandler(BaseHandler):
necessary.
Args:
- event_and_contexts:
+ room_id: The room ID of events being persisted.
+ event_and_contexts: Sequence of events with their associated
+ context that should be persisted. All events must belong to
+ the same room.
backfilled: Whether these events are a result of
backfilling or not
"""
- if self.config.worker.writers.events != self._instance_name:
+ instance = self.config.worker.events_shard_config.get_instance(room_id)
+ if instance != self._instance_name:
result = await self._send_events(
- instance_name=self.config.worker.writers.events,
+ instance_name=instance,
store=self.store,
+ room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e54e2b322b..a8fe5cf4e2 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -376,9 +376,8 @@ class EventCreationHandler:
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
- self._is_event_writer = (
- self.config.worker.writers.events == hs.get_instance_name()
- )
+ self._events_shard_config = self.config.worker.events_shard_config
+ self._instance_name = hs.get_instance_name()
self.room_invite_state_types = self.hs.config.room_invite_state_types
@@ -902,9 +901,10 @@ class EventCreationHandler:
try:
# If we're a worker we need to hit out to the master.
- if not self._is_event_writer:
+ writer_instance = self._events_shard_config.get_instance(event.room_id)
+ if writer_instance != self._instance_name:
result = await self.send_event(
- instance_name=self.config.worker.writers.events,
+ instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
@@ -972,8 +972,10 @@ class EventCreationHandler:
This should only be run on the instance in charge of persisting events.
"""
- assert self._is_event_writer
assert self.storage.persistence is not None
+ assert self._events_shard_config.should_handle(
+ self._instance_name, event.room_id
+ )
if ratelimit:
# We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 53d85ab97d..eeade6ad3f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", last_stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ last_stream_id,
)
return result, last_stream_id
@@ -1259,10 +1261,10 @@ class RoomShutdownHandler:
# We now wait for the create room to come back in via replication so
# that we can assume that all the joins/invites have propogated before
# we try and auto join below.
- #
- # TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+ "events",
+ stream_id,
)
else:
new_room_id = None
@@ -1292,7 +1294,9 @@ class RoomShutdownHandler:
# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ stream_id,
)
await self.room_member_handler.forget(target_requester.user, room_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 100f335b80..01a6e88262 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -82,13 +82,6 @@ class RoomMemberHandler:
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles
- self._event_stream_writer_instance = hs.config.worker.writers.events
- self._is_on_event_persistence_instance = (
- self._event_stream_writer_instance == hs.get_instance_name()
- )
- if self._is_on_event_persistence_instance:
- self.persist_event_storage = hs.get_storage().persistence
-
self._join_rate_limiter_local = Ratelimiter(
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 6b56315148..5c8be747e1 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -65,10 +65,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
- async def _serialize_payload(store, event_and_contexts, backfilled):
+ async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"""
Args:
store
+ room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
@@ -88,7 +89,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
}
)
- payload = {"events": event_payloads, "backfilled": backfilled}
+ payload = {
+ "events": event_payloads,
+ "backfilled": backfilled,
+ "room_id": room_id,
+ }
return payload
@@ -96,6 +101,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
+ room_id = content["room_id"]
backfilled = content["backfilled"]
event_payloads = content["events"]
@@ -120,7 +126,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
logger.info("Got %d events from federation", len(event_and_contexts))
max_stream_id = await self.federation_handler.persist_events_and_notify(
- event_and_contexts, backfilled
+ room_id, event_and_contexts, backfilled
)
return 200, {"max_stream_id": max_stream_id}
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1c303f3a46..b323841f73 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -109,7 +109,7 @@ class ReplicationCommandHandler:
if isinstance(stream, (EventsStream, BackfillStream)):
# Only add EventStream and BackfillStream as a source on the
# instance in charge of event persistence.
- if hs.config.worker.writers.events == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.events:
self._streams_to_replicate.append(stream)
continue
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f929fc3954..ccc7ca30d8 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type
import attr
-from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
+from ._base import Stream, StreamUpdateResult, Token
"""Handling of the 'events' replication stream
@@ -117,7 +117,7 @@ class EventsStream(Stream):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
- current_token_without_instance(self._store.get_current_events_token),
+ self._store._stream_id_gen.get_current_token_for_writer,
self._update_function,
)
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 985b12df91..aa5d490624 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -75,7 +75,7 @@ class Databases:
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
- if hs.config.worker.writers.events == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main)
if "state" in database_config.databases:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 0b69aa6a94..4c3c162acf 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"""
if stream_ordering <= self.stream_ordering_month_ago:
- raise StoreError(400, "stream_ordering too old")
+ raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
sql = """
SELECT event_id FROM stream_ordering_to_exterm
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 9cd1403b38..9a80f419e3 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -32,7 +32,7 @@ from synapse.logging.utils import log_function
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import StateMap, get_domain_from_id
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.iterutils import batch_iter
@@ -97,18 +97,21 @@ class PersistEventsStore:
self.store = main_data_store
self.database_engine = db.engine
self._clock = hs.get_clock()
+ self._instance_name = hs.get_instance_name()
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
self.is_mine_id = hs.is_mine_id
# Ideally we'd move these ID gens here, unfortunately some other ID
# generators are chained off them so doing so is a bit of a PITA.
- self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
- self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
+ self._backfill_id_gen = (
+ self.store._backfill_id_gen
+ ) # type: MultiWriterIdGenerator
+ self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator
# This should only exist on instances that are configured to write
assert (
- hs.config.worker.writers.events == hs.get_instance_name()
+ hs.get_instance_name() in hs.config.worker.writers.events
), "Can only instantiate EventsStore on master"
async def _persist_events_and_state_updates(
@@ -809,6 +812,7 @@ class PersistEventsStore:
table="events",
values=[
{
+ "instance_name": self._instance_name,
"stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth,
"depth": event.depth,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a7a73cc3d8..17f5997b89 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -42,7 +42,8 @@ from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import Collection, get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached
from synapse.util.iterutils import batch_iter
@@ -78,27 +79,54 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
- if hs.config.worker.writers.events == hs.get_instance_name():
- # We are the process in charge of generating stream ids for events,
- # so instantiate ID generators based on the database
- self._stream_id_gen = StreamIdGenerator(
- db_conn, "events", "stream_ordering",
+ if isinstance(database.engine, PostgresEngine):
+ # If we're using Postgres than we can use `MultiWriterIdGenerator`
+ # regardless of whether this process writes to the streams or not.
+ self._stream_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ instance_name=hs.get_instance_name(),
+ table="events",
+ instance_column="instance_name",
+ id_column="stream_ordering",
+ sequence_name="events_stream_seq",
)
- self._backfill_id_gen = StreamIdGenerator(
- db_conn,
- "events",
- "stream_ordering",
- step=-1,
- extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+ self._backfill_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ instance_name=hs.get_instance_name(),
+ table="events",
+ instance_column="instance_name",
+ id_column="stream_ordering",
+ sequence_name="events_backfill_stream_seq",
+ positive=False,
)
else:
- # Another process is in charge of persisting events and generating
- # stream IDs: rely on the replication streams to let us know which
- # IDs we can process.
- self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
- self._backfill_id_gen = SlavedIdTracker(
- db_conn, "events", "stream_ordering", step=-1
- )
+ # We shouldn't be running in worker mode with SQLite, but its useful
+ # to support it for unit tests.
+ #
+ # If this process is the writer than we need to use
+ # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
+ # updated over replication. (Multiple writers are not supported for
+ # SQLite).
+ if hs.get_instance_name() in hs.config.worker.writers.events:
+ self._stream_id_gen = StreamIdGenerator(
+ db_conn, "events", "stream_ordering",
+ )
+ self._backfill_id_gen = StreamIdGenerator(
+ db_conn,
+ "events",
+ "stream_ordering",
+ step=-1,
+ extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+ )
+ else:
+ self._stream_id_gen = SlavedIdTracker(
+ db_conn, "events", "stream_ordering"
+ )
+ self._backfill_id_gen = SlavedIdTracker(
+ db_conn, "events", "stream_ordering", step=-1
+ )
self._get_event_cache = Cache(
"*getEvent*",
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
new file mode 100644
index 0000000000..98ff76d709
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
@@ -0,0 +1,16 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
new file mode 100644
index 0000000000..97c1e6a0c5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
@@ -0,0 +1,26 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
+
+SELECT setval('events_stream_seq', (
+ SELECT COALESCE(MAX(stream_ordering), 1) FROM events
+));
+
+CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
+
+SELECT setval('events_backfill_stream_seq', (
+ SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
+));
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 2a66b3ad4e..1de2b91587 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -240,8 +240,12 @@ class MultiWriterIdGenerator:
# gaps should be relatively rare it's still worth doing the book keeping
# that allows us to skip forwards when there are gapless runs of
# positions.
+ #
+ # We start at 1 here as a) the first generated stream ID will be 2, and
+ # b) other parts of the code assume that stream IDs are strictly greater
+ # than 0.
self._persisted_upto_position = (
- min(self._current_positions.values()) if self._current_positions else 0
+ min(self._current_positions.values()) if self._current_positions else 1
)
self._known_persisted_positions = [] # type: List[int]
@@ -398,9 +402,7 @@ class MultiWriterIdGenerator:
equal to it have been successfully persisted.
"""
- # Currently we don't support this operation, as it's not obvious how to
- # condense the stream positions of multiple writers into a single int.
- raise NotImplementedError()
+ return self.get_persisted_upto_position()
def get_current_token_for_writer(self, instance_name: str) -> int:
"""Returns the position of the given writer.
|