diff options
author | Erik Johnston <erik@matrix.org> | 2020-09-24 16:53:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-24 16:53:51 +0100 |
commit | f112cfe5bb2c918c9e942941686a05664d8bd7da (patch) | |
tree | 7e58da475bcfe5ddb8c1191d1ef20c32069f5f5f /synapse/replication | |
parent | Add type annotations to SimpleHttpClient (#8372) (diff) | |
download | synapse-f112cfe5bb2c918c9e942941686a05664d8bd7da.tar.xz |
Fix MultiWriteIdGenerator's handling of restarts. (#8374)
On startup `MultiWriteIdGenerator` fetches the maximum stream ID for each instance from the table and uses that as its initial "current position" for each writer. This is problematic as a) it involves either a scan of events table or an index (neither of which is ideal), and b) if rows are being persisted out of order elsewhere while the process restarts then using the maximum stream ID is not correct. This could theoretically lead to race conditions where e.g. events that are persisted out of order are not sent down sync streams. We fix this by creating a new table that tracks the current positions of each writer to the stream, and update it each time we finish persisting a new entry. This is a relatively small overhead when persisting events. However for the cache invalidation stream this is a much bigger relative overhead, so instead we note that for invalidation we don't actually care about reliability over restarts (as there's no caches to invalidate) and simply don't bother reading and writing to the new table in that particular case.
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/slave/storage/_base.py | 2 |
1 files changed, 2 insertions, 0 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index d25fa49e1a..d0089fe06c 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -31,11 +31,13 @@ class BaseSlavedStore(CacheInvalidationWorkerStore): self._cache_id_gen = MultiWriterIdGenerator( db_conn, database, + stream_name="caches", instance_name=hs.get_instance_name(), table="cache_invalidation_stream_by_instance", instance_column="instance_name", id_column="stream_id", sequence_name="cache_invalidation_stream_seq", + writers=[], ) # type: Optional[MultiWriterIdGenerator] else: self._cache_id_gen = None |