diff options
author | Erik Johnston <erik@matrix.org> | 2016-09-15 11:47:23 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-09-15 11:47:23 +0100 |
commit | 211786ecd629588f2481c94217a4a388b090c993 (patch) | |
tree | 0ba7a3190107ffc2c87d10546ed111ad9f68d7f2 /synapse/replication/slave/storage/room.py | |
parent | Base public room list off of public_rooms stream (diff) | |
download | synapse-211786ecd629588f2481c94217a4a388b090c993.tar.xz |
Stream public room changes down replication
Diffstat (limited to 'synapse/replication/slave/storage/room.py')
-rw-r--r-- | synapse/replication/slave/storage/room.py | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index d5bb0f98ea..81743941dc 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -15,7 +15,38 @@ from ._base import BaseSlavedStore from synapse.storage import DataStore +from ._slaved_id_tracker import SlavedIdTracker class RoomStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(RoomStore, self).__init__(db_conn, hs) + self._public_room_id_gen = SlavedIdTracker( + db_conn, "public_room_list_stream", "stream_id" + ) + get_public_room_ids = DataStore.get_public_room_ids.__func__ + get_current_public_room_stream_id = ( + DataStore.get_current_public_room_stream_id.__func__ + ) + get_public_room_ids_at_stream_id = ( + DataStore.get_public_room_ids_at_stream_id.__func__ + ) + get_public_room_ids_at_stream_id_txn = ( + DataStore.get_public_room_ids_at_stream_id_txn.__func__ + ) + get_published_at_stream_id_txn = ( + DataStore.get_published_at_stream_id_txn.__func__ + ) + + def stream_positions(self): + result = super(RoomStore, self).stream_positions() + result["public_rooms"] = self._public_room_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("public_rooms") + if stream: + self._public_room_id_gen.advance(int(stream["position"])) + + return super(RoomStore, self).process_replication(result) |