summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2016-04-27 15:54:48 +0100
committerMark Haines <mjark@negativecurvature.net>2016-04-27 15:54:48 +0100
commit21d188bf95b6a75437f33f1dca1dd0a75dae86b4 (patch)
treed5f1e30fa3c6c32084d5d65f6b6fa5055cd818ec
parentMerge pull request #754 from matrix-org/markjh/check_for_nop (diff)
parentFix backfill replication to advance the stream correctly (diff)
downloadsynapse-21d188bf95b6a75437f33f1dca1dd0a75dae86b4.tar.xz
Merge pull request #755 from matrix-org/markjh/right_direction
Fix backfill replication to advance the stream correctly
-rw-r--r--synapse/replication/resource.py2
-rw-r--r--synapse/replication/slave/storage/events.py4
2 files changed, 3 insertions, 3 deletions
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index e5c9a53929..149fc4c650 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -382,7 +382,7 @@ class _Writer(object):
             position = rows[-1][0]
 
         self.streams[name] = {
-            "position": str(position),
+            "position": position if type(position) is int else str(position),
             "field_names": fields,
             "rows": rows,
         }
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 5f37ba6995..86f00b6ff5 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -118,7 +118,7 @@ class SlavedEventStore(BaseSlavedStore):
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
-        result["backfill"] = self._backfill_id_gen.get_current_token()
+        result["backfill"] = -self._backfill_id_gen.get_current_token()
         return result
 
     def process_replication(self, result):
@@ -136,7 +136,7 @@ class SlavedEventStore(BaseSlavedStore):
 
         stream = result.get("backfill")
         if stream:
-            self._backfill_id_gen.advance(stream["position"])
+            self._backfill_id_gen.advance(-stream["position"])
             for row in stream["rows"]:
                 self._process_replication_row(
                     row, backfilled=True, state_resets=state_resets