summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-11-16 13:50:07 +0000
committerGitHub <noreply@github.com>2022-11-16 13:50:07 +0000
commitd63814fd736fed5d3d45ff3af5e6d3bfae50c439 (patch)
tree67e5ba3310b0c29bbe52268467774631db8452b7 /synapse/replication
parentDon't filter state in /context response (#14461) (diff)
downloadsynapse-d63814fd736fed5d3d45ff3af5e6d3bfae50c439.tar.xz
Revert "Remove slaved id tracker (#14376)" (#14463)
This reverts commit 36097e88c4da51fce6556a58c49bd675f4cf20ab.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/__init__.py13
-rw-r--r--synapse/replication/slave/storage/__init__.py13
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py50
3 files changed, 76 insertions, 0 deletions
diff --git a/synapse/replication/slave/__init__.py b/synapse/replication/slave/__init__.py
new file mode 100644
index 0000000000..f43a360a80
--- /dev/null
+++ b/synapse/replication/slave/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py
new file mode 100644
index 0000000000..f43a360a80
--- /dev/null
+++ b/synapse/replication/slave/storage/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
new file mode 100644
index 0000000000..8f3f953ed4
--- /dev/null
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -0,0 +1,50 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import List, Optional, Tuple
+
+from synapse.storage.database import LoggingDatabaseConnection
+from synapse.storage.util.id_generators import AbstractStreamIdTracker, _load_current_id
+
+
+class SlavedIdTracker(AbstractStreamIdTracker):
+    """Tracks the "current" stream ID of a stream with a single writer.
+
+    See `AbstractStreamIdTracker` for more details.
+
+    Note that this class does not work correctly when there are multiple
+    writers.
+    """
+
+    def __init__(
+        self,
+        db_conn: LoggingDatabaseConnection,
+        table: str,
+        column: str,
+        extra_tables: Optional[List[Tuple[str, str]]] = None,
+        step: int = 1,
+    ):
+        self.step = step
+        self._current = _load_current_id(db_conn, table, column, step)
+        if extra_tables:
+            for table, column in extra_tables:
+                self.advance(None, _load_current_id(db_conn, table, column))
+
+    def advance(self, instance_name: Optional[str], new_id: int) -> None:
+        self._current = (max if self.step > 0 else min)(self._current, new_id)
+
+    def get_current_token(self) -> int:
+        return self._current
+
+    def get_current_token_for_writer(self, instance_name: str) -> int:
+        return self.get_current_token()