summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-14 10:16:41 +0100
committerGitHub <noreply@github.com>2020-09-14 10:16:41 +0100
commit04cc249b43e8716513f788b2a4eeb8ede24d19df (patch)
tree1f5e539b9db71f3d47980ce99c47e6f1d747352b /synapse/storage/util
parentMerge tag 'v1.20.0rc3' into develop (diff)
downloadsynapse-04cc249b43e8716513f788b2a4eeb8ede24d19df.tar.xz
Add experimental support for sharding event persister. Again. (#8294)
This is *not* ready for production yet. Caveats:

1. We should write some tests...
2. The stream token that we use for events can get stalled at the minimum position of all writers. This means that new events may not be processed and e.g. sent down sync streams if a writer isn't writing or is slow.
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py10
1 files changed, 6 insertions, 4 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 2a66b3ad4e..1de2b91587 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -240,8 +240,12 @@ class MultiWriterIdGenerator:
         # gaps should be relatively rare it's still worth doing the book keeping
         # that allows us to skip forwards when there are gapless runs of
         # positions.
+        #
+        # We start at 1 here as a) the first generated stream ID will be 2, and
+        # b) other parts of the code assume that stream IDs are strictly greater
+        # than 0.
         self._persisted_upto_position = (
-            min(self._current_positions.values()) if self._current_positions else 0
+            min(self._current_positions.values()) if self._current_positions else 1
         )
         self._known_persisted_positions = []  # type: List[int]
 
@@ -398,9 +402,7 @@ class MultiWriterIdGenerator:
         equal to it have been successfully persisted.
         """
 
-        # Currently we don't support this operation, as it's not obvious how to
-        # condense the stream positions of multiple writers into a single int.
-        raise NotImplementedError()
+        return self.get_persisted_upto_position()
 
     def get_current_token_for_writer(self, instance_name: str) -> int:
         """Returns the position of the given writer.