summary refs log tree commit diff
path: root/synapse/replication/slave/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-24 16:53:51 +0100
committerGitHub <noreply@github.com>2020-09-24 16:53:51 +0100
commitf112cfe5bb2c918c9e942941686a05664d8bd7da (patch)
tree7e58da475bcfe5ddb8c1191d1ef20c32069f5f5f /synapse/replication/slave/storage
parentAdd type annotations to SimpleHttpClient (#8372) (diff)
downloadsynapse-f112cfe5bb2c918c9e942941686a05664d8bd7da.tar.xz
Fix MultiWriteIdGenerator's handling of restarts. (#8374)
On startup `MultiWriteIdGenerator` fetches the maximum stream ID for
each instance from the table and uses that as its initial "current
position" for each writer. This is problematic as a) it involves either
a scan of events table or an index (neither of which is ideal), and b)
if rows are being persisted out of order elsewhere while the process
restarts then using the maximum stream ID is not correct. This could
theoretically lead to race conditions where e.g. events that are
persisted out of order are not sent down sync streams.

We fix this by creating a new table that tracks the current positions of
each writer to the stream, and update it each time we finish persisting
a new entry. This is a relatively small overhead when persisting events.
However for the cache invalidation stream this is a much bigger relative
overhead, so instead we note that for invalidation we don't actually
care about reliability over restarts (as there's no caches to
invalidate) and simply don't bother reading and writing to the new table
in that particular case.
Diffstat (limited to 'synapse/replication/slave/storage')
-rw-r--r--synapse/replication/slave/storage/_base.py2
1 files changed, 2 insertions, 0 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index d25fa49e1a..d0089fe06c 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -31,11 +31,13 @@ class BaseSlavedStore(CacheInvalidationWorkerStore):
             self._cache_id_gen = MultiWriterIdGenerator(
                 db_conn,
                 database,
+                stream_name="caches",
                 instance_name=hs.get_instance_name(),
                 table="cache_invalidation_stream_by_instance",
                 instance_column="instance_name",
                 id_column="stream_id",
                 sequence_name="cache_invalidation_stream_seq",
+                writers=[],
             )  # type: Optional[MultiWriterIdGenerator]
         else:
             self._cache_id_gen = None