diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index d4db1e452e..5fd47706ef 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -201,48 +201,25 @@ class SlavedEventStore(BaseSlavedStore):
result["backfill"] = -self._backfill_id_gen.get_current_token()
return result
- def process_replication(self, result):
- stream = result.get("events")
- if stream:
- self._stream_id_gen.advance(int(stream["position"]))
-
- if stream["rows"]:
- logger.info("Got %d event rows", len(stream["rows"]))
-
- for row in stream["rows"]:
- self._process_replication_row(
- row, backfilled=False,
+ def process_replication_rows(self, stream_name, token, rows):
+ if stream_name == "events":
+ self._stream_id_gen.advance(token)
+ for row in rows:
+ self.invalidate_caches_for_event(
+ token, row.event_id, row.room_id, row.type, row.state_key,
+ row.redacts,
+ backfilled=False,
)
-
- stream = result.get("backfill")
- if stream:
- self._backfill_id_gen.advance(-int(stream["position"]))
- for row in stream["rows"]:
- self._process_replication_row(
- row, backfilled=True,
+ elif stream_name == "backfill":
+ self._backfill_id_gen.advance(-token)
+ for row in rows:
+ self.invalidate_caches_for_event(
+ -token, row.event_id, row.room_id, row.type, row.state_key,
+ row.redacts,
+ backfilled=True,
)
-
- stream = result.get("forward_ex_outliers")
- if stream:
- self._stream_id_gen.advance(int(stream["position"]))
- for row in stream["rows"]:
- event_id = row[1]
- self._invalidate_get_event_cache(event_id)
-
- stream = result.get("backward_ex_outliers")
- if stream:
- self._backfill_id_gen.advance(-int(stream["position"]))
- for row in stream["rows"]:
- event_id = row[1]
- self._invalidate_get_event_cache(event_id)
-
- return super(SlavedEventStore, self).process_replication(result)
-
- def _process_replication_row(self, row, backfilled):
- stream_ordering = row[0] if not backfilled else -row[0]
- self.invalidate_caches_for_event(
- stream_ordering, row[1], row[2], row[3], row[4], row[5],
- backfilled=backfilled,
+ return super(SlavedEventStore, self).process_replication_rows(
+ stream_name, token, rows
)
def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
|