summary refs log tree commit diff
path: root/synapse/replication/http
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2023-03-22 12:53:55 +0000
committerGitHub <noreply@github.com>2023-03-22 12:53:55 +0000
commit1bc9985eb7feca2a8eb4a0125cd03dfa3ac631fe (patch)
tree4e54f44cb2f852316af725519e6797fdd477cbb8 /synapse/replication/http
parentChange the parameter `immediate` of `send_device_messages` to default to `Tru... (diff)
downloadsynapse-1bc9985eb7feca2a8eb4a0125cd03dfa3ac631fe.tar.xz
Have replication clients remove _INT_STREAM_POS (#15309)
* Have replication clients remove _INT_STREAM_POS

Suppose worker A makes an internal http request from worker B. B may
make changes that A later learns about over replication. We want A's
request to block until it has seen those changes—mainly to ensure A's
caches are invalidated promptly. This helps provide read-after-write
consistency, eliminating entire categories of races and test flakes.

To implement this, B includes a top-level field `_INT_STREAM_POS` in its
response JSON. Roughly speaking, the field's value tells A what to wait
for. But we weren't removing that internal field before A's request
completed!

Introduced in https://github.com/matrix-org/synapse/pull/14820.
Fixes #15308.

* Changelog
Diffstat (limited to 'synapse/replication/http')
-rw-r--r--synapse/replication/http/_base.py2
1 files changed, 1 insertions, 1 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index c20d9c7e9d..8c2c54c07a 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -345,7 +345,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                 _outgoing_request_counter.labels(cls.NAME, 200).inc()
 
                 # Wait on any streams that the remote may have written to.
-                for stream_name, position in result.get(
+                for stream_name, position in result.pop(
                     _STREAM_POSITION_KEY, {}
                 ).items():
                     await replication.wait_for_stream_position(