diff options
author | Erik Johnston <erik@matrix.org> | 2020-08-18 11:18:11 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-08-24 15:49:22 +0100 |
commit | 98b7415fcd79a0e264a054964f57087222fb9e02 (patch) | |
tree | 0c4b8906676bc0d744bd6a96a4fa752154f50226 | |
parent | add get_persisted_upto_position (diff) | |
download | synapse-98b7415fcd79a0e264a054964f57087222fb9e02.tar.xz |
Add multiwriter for events
-rw-r--r-- | synapse/handlers/message.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/events.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 7 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 33 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql | 16 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres | 20 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 46 | ||||
-rw-r--r-- | synapse/storage/util/sequence.py | 8 |
8 files changed, 121 insertions, 15 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c955a86be0..67d9f95202 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -675,7 +675,7 @@ class EventCreationHandler(object): event.event_id, prev_event.event_id, ) - return await self.store.get_stream_id_for_event(prev_event.event_id) + return await self.store.get_stream_token_for_event(prev_event.event_id) return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 16c63ff4ec..3705618b4f 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -19,7 +19,7 @@ from typing import List, Tuple, Type import attr -from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance +from ._base import Stream, StreamUpdateResult, Token """Handling of the 'events' replication stream @@ -117,7 +117,7 @@ class EventsStream(Stream): self._store = hs.get_datastore() super().__init__( hs.get_instance_name(), - current_token_without_instance(self._store.get_current_events_token), + self._store._stream_id_gen.get_current_token_for_writer, self._update_function, ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b90e6de2d5..989372193f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -34,6 +34,7 @@ from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchEntry from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import StateMap, get_domain_from_id +from synapse.util.async_helpers import maybe_awaitable from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.iterutils import batch_iter @@ -97,6 +98,7 @@ class PersistEventsStore: self.store = main_data_store self.database_engine = db.engine self._clock = hs.get_clock() + self._instance_name = hs.get_instance_name() self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages self.is_mine_id = hs.is_mine_id @@ -157,8 +159,8 @@ class PersistEventsStore: len(events_and_contexts) ) else: - stream_ordering_manager = self._stream_id_gen.get_next_mult( - len(events_and_contexts) + stream_ordering_manager = await maybe_awaitable( + self._stream_id_gen.get_next_mult(len(events_and_contexts)) ) with stream_ordering_manager as stream_orderings: @@ -800,6 +802,7 @@ class PersistEventsStore: table="events", values=[ { + "instance_name": self._instance_name, "stream_ordering": event.internal_metadata.stream_ordering, "topological_ordering": event.depth, "depth": event.depth, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4a3333c0db..8f46e04fa6 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -42,7 +42,8 @@ from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool -from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.storage.engines import PostgresEngine +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.types import Collection, get_domain_from_id from synapse.util.caches.descriptors import Cache, cached from synapse.util.iterutils import batch_iter @@ -81,9 +82,21 @@ class EventsWorkerStore(SQLBaseStore): if hs.config.worker.writers.events == hs.get_instance_name(): # We are the process in charge of generating stream ids for events, # so instantiate ID generators based on the database - self._stream_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", - ) + if isinstance(database.engine, PostgresEngine): + self._stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + instance_name=hs.get_instance_name(), + table="events", + instance_column="instance_name", + id_column="stream_ordering", + sequence_name="events_stream_seq", + ) + else: + self._stream_id_gen = StreamIdGenerator( + db_conn, "events", "stream_ordering", + ) + self._backfill_id_gen = StreamIdGenerator( db_conn, "events", @@ -95,7 +108,15 @@ class EventsWorkerStore(SQLBaseStore): # Another process is in charge of persisting events and generating # stream IDs: rely on the replication streams to let us know which # IDs we can process. - self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") + self._stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + instance_name=hs.get_instance_name(), + table="events", + instance_column="instance_name", + id_column="stream_ordering", + sequence_name="events_stream_seq", + ) self._backfill_id_gen = SlavedIdTracker( db_conn, "events", "stream_ordering", step=-1 ) @@ -113,7 +134,7 @@ class EventsWorkerStore(SQLBaseStore): def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == EventsStream.NAME: - self._stream_id_gen.advance(token) + self._stream_id_gen.advance(instance_name, token) elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(-token) diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql new file mode 100644 index 0000000000..98ff76d709 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql @@ -0,0 +1,16 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres new file mode 100644 index 0000000000..7901f89941 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres @@ -0,0 +1,20 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS events_stream_seq; + +SELECT setval('events_stream_seq', ( + SELECT COALESCE(MAX(stream_ordering), 1) FROM events +)); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index b63a71ed97..90663f3443 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -15,6 +15,7 @@ import contextlib import heapq +import logging import threading from collections import deque from typing import Dict, List, Set @@ -24,6 +25,8 @@ from typing_extensions import Deque from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.util.sequence import PostgresSequenceGenerator +logger = logging.getLogger(__name__) + class IdGenerator(object): def __init__(self, db_conn, table, column): @@ -146,7 +149,7 @@ class StreamIdGenerator(object): return manager() - def get_current_token(self): + def get_current_token(self, instance_name=None): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. @@ -244,9 +247,12 @@ class MultiWriterIdGenerator: return current_positions - def _load_next_id_txn(self, txn): + def _load_next_id_txn(self, txn) -> int: return self._sequence_gen.get_next_id_txn(txn) + def _load_next_mult_id_txn(self, txn, n: int) -> List[int]: + return self._sequence_gen.get_next_mult_txn(txn, n) + async def get_next(self): """ Usage: @@ -272,6 +278,34 @@ class MultiWriterIdGenerator: return manager() + async def get_next_mult(self, n: int): + """ + Usage: + with await stream_id_gen.get_next_mult(5) as stream_ids: + # ... persist event ... + """ + next_ids = await self._db.runInteraction( + "_load_next_mult_id", self._load_next_mult_id_txn, n + ) + + # Assert the fetched ID is actually greater than what we currently + # believe the ID to be. If not, then the sequence and table have got + # out of sync somehow. + assert self.get_current_token() < min(next_ids) + + with self._lock: + self._unfinished_ids.update(next_ids) + + @contextlib.contextmanager + def manager(): + try: + yield next_ids + finally: + for i in next_ids: + self._mark_id_as_finished(i) + + return manager() + def get_next_txn(self, txn: LoggingTransaction): """ Usage: @@ -311,7 +345,7 @@ class MultiWriterIdGenerator: # Currently we don't support this operation, as it's not obvious how to # condense the stream positions of multiple writers into a single int. - raise NotImplementedError() + return self.get_persisted_upto_position() def get_current_token_for_writer(self, instance_name: str) -> int: """Returns the position of the given writer. @@ -379,3 +413,9 @@ class MultiWriterIdGenerator: # There was a gap in seen positions, so there is nothing more to # do. break + + logger.info( + "Got new_id: %s, setting persited pos to %s", + new_id, + self._persisted_upto_position, + ) diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 63dfea4220..f60237ed03 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -14,7 +14,7 @@ # limitations under the License. import abc import threading -from typing import Callable, Optional +from typing import Callable, List, Optional from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.types import Cursor @@ -39,6 +39,12 @@ class PostgresSequenceGenerator(SequenceGenerator): txn.execute("SELECT nextval(?)", (self._sequence_name,)) return txn.fetchone()[0] + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + txn.execute( + "SELECT nextval(?) FROM generate_series(1, ?)", (self._sequence_name, n) + ) + return [i for i, in txn] + GetFirstCallbackType = Callable[[Cursor], int] |