diff options
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 160 |
1 files changed, 82 insertions, 78 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index b6ce7a7bee..7ef67a5a73 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -26,78 +26,75 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 10000 -BackfillStreamRow = namedtuple("BackfillStreamRow", ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional - "relates_to", # str, optional -)) -PresenceStreamRow = namedtuple("PresenceStreamRow", ( - "user_id", # str - "state", # str - "last_active_ts", # int - "last_federation_update_ts", # int - "last_user_sync_ts", # int - "status_msg", # str - "currently_active", # bool -)) -TypingStreamRow = namedtuple("TypingStreamRow", ( - "room_id", # str - "user_ids", # list(str) -)) -ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", ( - "room_id", # str - "receipt_type", # str - "user_id", # str - "event_id", # str - "data", # dict -)) -PushRulesStreamRow = namedtuple("PushRulesStreamRow", ( - "user_id", # str -)) -PushersStreamRow = namedtuple("PushersStreamRow", ( - "user_id", # str - "app_id", # str - "pushkey", # str - "deleted", # bool -)) -CachesStreamRow = namedtuple("CachesStreamRow", ( - "cache_func", # str - "keys", # list(str) - "invalidation_ts", # int -)) -PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", ( - "room_id", # str - "visibility", # str - "appservice_id", # str, optional - "network_id", # str, optional -)) -DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( - "user_id", # str - "destination", # str -)) -ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( - "entity", # str -)) -TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( - "user_id", # str - "room_id", # str - "data", # dict -)) -AccountDataStreamRow = namedtuple("AccountDataStream", ( - "user_id", # str - "room_id", # str - "data_type", # str - "data", # dict -)) -GroupsStreamRow = namedtuple("GroupsStreamRow", ( - "group_id", # str - "user_id", # str - "type", # str - "content", # dict -)) +BackfillStreamRow = namedtuple( + "BackfillStreamRow", + ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional + "relates_to", # str, optional + ), +) +PresenceStreamRow = namedtuple( + "PresenceStreamRow", + ( + "user_id", # str + "state", # str + "last_active_ts", # int + "last_federation_update_ts", # int + "last_user_sync_ts", # int + "status_msg", # str + "currently_active", # bool + ), +) +TypingStreamRow = namedtuple( + "TypingStreamRow", ("room_id", "user_ids") # str # list(str) +) +ReceiptsStreamRow = namedtuple( + "ReceiptsStreamRow", + ( + "room_id", # str + "receipt_type", # str + "user_id", # str + "event_id", # str + "data", # dict + ), +) +PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str +PushersStreamRow = namedtuple( + "PushersStreamRow", + ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool +) +CachesStreamRow = namedtuple( + "CachesStreamRow", + ("cache_func", "keys", "invalidation_ts"), # str # list(str) # int +) +PublicRoomsStreamRow = namedtuple( + "PublicRoomsStreamRow", + ( + "room_id", # str + "visibility", # str + "appservice_id", # str, optional + "network_id", # str, optional + ), +) +DeviceListsStreamRow = namedtuple( + "DeviceListsStreamRow", ("user_id", "destination") # str # str +) +ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str +TagAccountDataStreamRow = namedtuple( + "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict +) +AccountDataStreamRow = namedtuple( + "AccountDataStream", + ("user_id", "room_id", "data_type", "data"), # str # str # str # dict +) +GroupsStreamRow = namedtuple( + "GroupsStreamRow", + ("group_id", "user_id", "type", "content"), # str # str # str # dict +) class Stream(object): @@ -106,6 +103,7 @@ class Stream(object): Provides a `get_updates()` function that returns new updates since the last time it was called up until the point `advance_current_token` was called. """ + NAME = None # The name of the stream ROW_TYPE = None # The type of the row. Used by the default impl of parse_row. _LIMITED = True # Whether the update function takes a limit @@ -185,16 +183,13 @@ class Stream(object): if self._LIMITED: rows = yield self.update_function( - from_token, current_token, - limit=MAX_EVENTS_BEHIND + 1, + from_token, current_token, limit=MAX_EVENTS_BEHIND + 1 ) # never turn more than MAX_EVENTS_BEHIND + 1 into updates. rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) else: - rows = yield self.update_function( - from_token, current_token, - ) + rows = yield self.update_function(from_token, current_token) updates = [(row[0], row[1:]) for row in rows] @@ -230,6 +225,7 @@ class BackfillStream(Stream): """We fetched some old events and either we had never seen that event before or it went from being an outlier to not. """ + NAME = "backfill" ROW_TYPE = BackfillStreamRow @@ -286,6 +282,7 @@ class ReceiptsStream(Stream): class PushRulesStream(Stream): """A user has changed their push rules """ + NAME = "push_rules" ROW_TYPE = PushRulesStreamRow @@ -306,6 +303,7 @@ class PushRulesStream(Stream): class PushersStream(Stream): """A user has added/changed/removed a pusher """ + NAME = "pushers" ROW_TYPE = PushersStreamRow @@ -322,6 +320,7 @@ class CachesStream(Stream): """A cache was invalidated on the master and no other stream would invalidate the cache on the workers """ + NAME = "caches" ROW_TYPE = CachesStreamRow @@ -337,6 +336,7 @@ class CachesStream(Stream): class PublicRoomsStream(Stream): """The public rooms list changed """ + NAME = "public_rooms" ROW_TYPE = PublicRoomsStreamRow @@ -352,6 +352,7 @@ class PublicRoomsStream(Stream): class DeviceListsStream(Stream): """Someone added/changed/removed a device """ + NAME = "device_lists" _LIMITED = False ROW_TYPE = DeviceListsStreamRow @@ -368,6 +369,7 @@ class DeviceListsStream(Stream): class ToDeviceStream(Stream): """New to_device messages for a client """ + NAME = "to_device" ROW_TYPE = ToDeviceStreamRow @@ -383,6 +385,7 @@ class ToDeviceStream(Stream): class TagAccountDataStream(Stream): """Someone added/removed a tag for a room """ + NAME = "tag_account_data" ROW_TYPE = TagAccountDataStreamRow @@ -398,6 +401,7 @@ class TagAccountDataStream(Stream): class AccountDataStream(Stream): """Global or per room account data was changed """ + NAME = "account_data" ROW_TYPE = AccountDataStreamRow @@ -416,7 +420,7 @@ class AccountDataStream(Stream): results = list(room_results) results.extend( - (stream_id, user_id, None, account_data_type, content,) + (stream_id, user_id, None, account_data_type, content) for stream_id, user_id, account_data_type, content in global_results ) |