summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-06-15 08:44:54 -0400
committerGitHub <noreply@github.com>2020-06-15 08:44:54 -0400
commit7d2532be36dc116e130ad226a7462bb0e899aca4 (patch)
treeff008562381c65706509a752d383e165140536d5 /synapse/app/generic_worker.py
parentReplace iteritems/itervalues/iterkeys with native versions. (#7692) (diff)
downloadsynapse-7d2532be36dc116e130ad226a7462bb0e899aca4.tar.xz
Discard RDATA from already seen positions. (#7648)
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py5
1 files changed, 5 insertions, 0 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f3ec2a34ec..53c488d211 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -738,6 +738,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         except Exception:
             logger.exception("Error processing replication")
 
+    async def on_position(self, stream_name: str, instance_name: str, token: int):
+        await super().on_position(stream_name, instance_name, token)
+        # Also call on_rdata to ensure that stream positions are properly reset.
+        await self.on_rdata(stream_name, instance_name, token, [])
+
     def stop_pusher(self, user_id, app_id, pushkey):
         if not self.notify_pushers:
             return