summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-08-18 11:18:11 +0100
committerErik Johnston <erik@matrix.org>2020-08-24 15:49:22 +0100
commit98b7415fcd79a0e264a054964f57087222fb9e02 (patch)
tree0c4b8906676bc0d744bd6a96a4fa752154f50226
parentadd get_persisted_upto_position (diff)
downloadsynapse-98b7415fcd79a0e264a054964f57087222fb9e02.tar.xz
Add multiwriter for events
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/replication/tcp/streams/events.py4
-rw-r--r--synapse/storage/databases/main/events.py7
-rw-r--r--synapse/storage/databases/main/events_worker.py33
-rw-r--r--synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql16
-rw-r--r--synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres20
-rw-r--r--synapse/storage/util/id_generators.py46
-rw-r--r--synapse/storage/util/sequence.py8
8 files changed, 121 insertions, 15 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c955a86be0..67d9f95202 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -675,7 +675,7 @@ class EventCreationHandler(object):
                     event.event_id,
                     prev_event.event_id,
                 )
-                return await self.store.get_stream_id_for_event(prev_event.event_id)
+                return await self.store.get_stream_token_for_event(prev_event.event_id)
 
         return await self.handle_new_client_event(
             requester=requester, event=event, context=context, ratelimit=ratelimit
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 16c63ff4ec..3705618b4f 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type
 
 import attr
 
-from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
+from ._base import Stream, StreamUpdateResult, Token
 
 """Handling of the 'events' replication stream
 
@@ -117,7 +117,7 @@ class EventsStream(Stream):
         self._store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
-            current_token_without_instance(self._store.get_current_events_token),
+            self._store._stream_id_gen.get_current_token_for_writer,
             self._update_function,
         )
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b90e6de2d5..989372193f 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -34,6 +34,7 @@ from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.search import SearchEntry
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import StateMap, get_domain_from_id
+from synapse.util.async_helpers import maybe_awaitable
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.iterutils import batch_iter
 
@@ -97,6 +98,7 @@ class PersistEventsStore:
         self.store = main_data_store
         self.database_engine = db.engine
         self._clock = hs.get_clock()
+        self._instance_name = hs.get_instance_name()
 
         self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
         self.is_mine_id = hs.is_mine_id
@@ -157,8 +159,8 @@ class PersistEventsStore:
                 len(events_and_contexts)
             )
         else:
-            stream_ordering_manager = self._stream_id_gen.get_next_mult(
-                len(events_and_contexts)
+            stream_ordering_manager = await maybe_awaitable(
+                self._stream_id_gen.get_next_mult(len(events_and_contexts))
             )
 
         with stream_ordering_manager as stream_orderings:
@@ -800,6 +802,7 @@ class PersistEventsStore:
             table="events",
             values=[
                 {
+                    "instance_name": self._instance_name,
                     "stream_ordering": event.internal_metadata.stream_ordering,
                     "topological_ordering": event.depth,
                     "depth": event.depth,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4a3333c0db..8f46e04fa6 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -42,7 +42,8 @@ from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.types import Collection, get_domain_from_id
 from synapse.util.caches.descriptors import Cache, cached
 from synapse.util.iterutils import batch_iter
@@ -81,9 +82,21 @@ class EventsWorkerStore(SQLBaseStore):
         if hs.config.worker.writers.events == hs.get_instance_name():
             # We are the process in charge of generating stream ids for events,
             # so instantiate ID generators based on the database
-            self._stream_id_gen = StreamIdGenerator(
-                db_conn, "events", "stream_ordering",
-            )
+            if isinstance(database.engine, PostgresEngine):
+                self._stream_id_gen = MultiWriterIdGenerator(
+                    db_conn=db_conn,
+                    db=database,
+                    instance_name=hs.get_instance_name(),
+                    table="events",
+                    instance_column="instance_name",
+                    id_column="stream_ordering",
+                    sequence_name="events_stream_seq",
+                )
+            else:
+                self._stream_id_gen = StreamIdGenerator(
+                    db_conn, "events", "stream_ordering",
+                )
+
             self._backfill_id_gen = StreamIdGenerator(
                 db_conn,
                 "events",
@@ -95,7 +108,15 @@ class EventsWorkerStore(SQLBaseStore):
             # Another process is in charge of persisting events and generating
             # stream IDs: rely on the replication streams to let us know which
             # IDs we can process.
-            self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
+            self._stream_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                instance_name=hs.get_instance_name(),
+                table="events",
+                instance_column="instance_name",
+                id_column="stream_ordering",
+                sequence_name="events_stream_seq",
+            )
             self._backfill_id_gen = SlavedIdTracker(
                 db_conn, "events", "stream_ordering", step=-1
             )
@@ -113,7 +134,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
         if stream_name == EventsStream.NAME:
-            self._stream_id_gen.advance(token)
+            self._stream_id_gen.advance(instance_name, token)
         elif stream_name == BackfillStream.NAME:
             self._backfill_id_gen.advance(-token)
 
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
new file mode 100644
index 0000000000..98ff76d709
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql
@@ -0,0 +1,16 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
new file mode 100644
index 0000000000..7901f89941
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
+
+SELECT setval('events_stream_seq', (
+    SELECT COALESCE(MAX(stream_ordering), 1) FROM events
+));
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index b63a71ed97..90663f3443 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -15,6 +15,7 @@
 
 import contextlib
 import heapq
+import logging
 import threading
 from collections import deque
 from typing import Dict, List, Set
@@ -24,6 +25,8 @@ from typing_extensions import Deque
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.util.sequence import PostgresSequenceGenerator
 
+logger = logging.getLogger(__name__)
+
 
 class IdGenerator(object):
     def __init__(self, db_conn, table, column):
@@ -146,7 +149,7 @@ class StreamIdGenerator(object):
 
         return manager()
 
-    def get_current_token(self):
+    def get_current_token(self, instance_name=None):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
 
@@ -244,9 +247,12 @@ class MultiWriterIdGenerator:
 
         return current_positions
 
-    def _load_next_id_txn(self, txn):
+    def _load_next_id_txn(self, txn) -> int:
         return self._sequence_gen.get_next_id_txn(txn)
 
+    def _load_next_mult_id_txn(self, txn, n: int) -> List[int]:
+        return self._sequence_gen.get_next_mult_txn(txn, n)
+
     async def get_next(self):
         """
         Usage:
@@ -272,6 +278,34 @@ class MultiWriterIdGenerator:
 
         return manager()
 
+    async def get_next_mult(self, n: int):
+        """
+        Usage:
+            with await stream_id_gen.get_next_mult(5) as stream_ids:
+                # ... persist event ...
+        """
+        next_ids = await self._db.runInteraction(
+            "_load_next_mult_id", self._load_next_mult_id_txn, n
+        )
+
+        # Assert the fetched ID is actually greater than what we currently
+        # believe the ID to be. If not, then the sequence and table have got
+        # out of sync somehow.
+        assert self.get_current_token() < min(next_ids)
+
+        with self._lock:
+            self._unfinished_ids.update(next_ids)
+
+        @contextlib.contextmanager
+        def manager():
+            try:
+                yield next_ids
+            finally:
+                for i in next_ids:
+                    self._mark_id_as_finished(i)
+
+        return manager()
+
     def get_next_txn(self, txn: LoggingTransaction):
         """
         Usage:
@@ -311,7 +345,7 @@ class MultiWriterIdGenerator:
 
         # Currently we don't support this operation, as it's not obvious how to
         # condense the stream positions of multiple writers into a single int.
-        raise NotImplementedError()
+        return self.get_persisted_upto_position()
 
     def get_current_token_for_writer(self, instance_name: str) -> int:
         """Returns the position of the given writer.
@@ -379,3 +413,9 @@ class MultiWriterIdGenerator:
                 # There was a gap in seen positions, so there is nothing more to
                 # do.
                 break
+
+        logger.info(
+            "Got new_id: %s, setting persited pos to %s",
+            new_id,
+            self._persisted_upto_position,
+        )
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 63dfea4220..f60237ed03 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 import abc
 import threading
-from typing import Callable, Optional
+from typing import Callable, List, Optional
 
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.types import Cursor
@@ -39,6 +39,12 @@ class PostgresSequenceGenerator(SequenceGenerator):
         txn.execute("SELECT nextval(?)", (self._sequence_name,))
         return txn.fetchone()[0]
 
+    def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
+        txn.execute(
+            "SELECT nextval(?) FROM generate_series(1, ?)", (self._sequence_name, n)
+        )
+        return [i for i, in txn]
+
 
 GetFirstCallbackType = Callable[[Cursor], int]