diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index b6ce7a7bee..7ef67a5a73 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -26,78 +26,75 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
-BackfillStreamRow = namedtuple("BackfillStreamRow", (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
- "relates_to", # str, optional
-))
-PresenceStreamRow = namedtuple("PresenceStreamRow", (
- "user_id", # str
- "state", # str
- "last_active_ts", # int
- "last_federation_update_ts", # int
- "last_user_sync_ts", # int
- "status_msg", # str
- "currently_active", # bool
-))
-TypingStreamRow = namedtuple("TypingStreamRow", (
- "room_id", # str
- "user_ids", # list(str)
-))
-ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
- "room_id", # str
- "receipt_type", # str
- "user_id", # str
- "event_id", # str
- "data", # dict
-))
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
- "user_id", # str
-))
-PushersStreamRow = namedtuple("PushersStreamRow", (
- "user_id", # str
- "app_id", # str
- "pushkey", # str
- "deleted", # bool
-))
-CachesStreamRow = namedtuple("CachesStreamRow", (
- "cache_func", # str
- "keys", # list(str)
- "invalidation_ts", # int
-))
-PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
- "room_id", # str
- "visibility", # str
- "appservice_id", # str, optional
- "network_id", # str, optional
-))
-DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
- "user_id", # str
- "destination", # str
-))
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
- "entity", # str
-))
-TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
- "user_id", # str
- "room_id", # str
- "data", # dict
-))
-AccountDataStreamRow = namedtuple("AccountDataStream", (
- "user_id", # str
- "room_id", # str
- "data_type", # str
- "data", # dict
-))
-GroupsStreamRow = namedtuple("GroupsStreamRow", (
- "group_id", # str
- "user_id", # str
- "type", # str
- "content", # dict
-))
+BackfillStreamRow = namedtuple(
+ "BackfillStreamRow",
+ (
+ "event_id", # str
+ "room_id", # str
+ "type", # str
+ "state_key", # str, optional
+ "redacts", # str, optional
+ "relates_to", # str, optional
+ ),
+)
+PresenceStreamRow = namedtuple(
+ "PresenceStreamRow",
+ (
+ "user_id", # str
+ "state", # str
+ "last_active_ts", # int
+ "last_federation_update_ts", # int
+ "last_user_sync_ts", # int
+ "status_msg", # str
+ "currently_active", # bool
+ ),
+)
+TypingStreamRow = namedtuple(
+ "TypingStreamRow", ("room_id", "user_ids") # str # list(str)
+)
+ReceiptsStreamRow = namedtuple(
+ "ReceiptsStreamRow",
+ (
+ "room_id", # str
+ "receipt_type", # str
+ "user_id", # str
+ "event_id", # str
+ "data", # dict
+ ),
+)
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
+PushersStreamRow = namedtuple(
+ "PushersStreamRow",
+ ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
+)
+CachesStreamRow = namedtuple(
+ "CachesStreamRow",
+ ("cache_func", "keys", "invalidation_ts"), # str # list(str) # int
+)
+PublicRoomsStreamRow = namedtuple(
+ "PublicRoomsStreamRow",
+ (
+ "room_id", # str
+ "visibility", # str
+ "appservice_id", # str, optional
+ "network_id", # str, optional
+ ),
+)
+DeviceListsStreamRow = namedtuple(
+ "DeviceListsStreamRow", ("user_id", "destination") # str # str
+)
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
+TagAccountDataStreamRow = namedtuple(
+ "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
+)
+AccountDataStreamRow = namedtuple(
+ "AccountDataStream",
+ ("user_id", "room_id", "data_type", "data"), # str # str # str # dict
+)
+GroupsStreamRow = namedtuple(
+ "GroupsStreamRow",
+ ("group_id", "user_id", "type", "content"), # str # str # str # dict
+)
class Stream(object):
@@ -106,6 +103,7 @@ class Stream(object):
Provides a `get_updates()` function that returns new updates since the last
time it was called up until the point `advance_current_token` was called.
"""
+
NAME = None # The name of the stream
ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
_LIMITED = True # Whether the update function takes a limit
@@ -185,16 +183,13 @@ class Stream(object):
if self._LIMITED:
rows = yield self.update_function(
- from_token, current_token,
- limit=MAX_EVENTS_BEHIND + 1,
+ from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
)
# never turn more than MAX_EVENTS_BEHIND + 1 into updates.
rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
else:
- rows = yield self.update_function(
- from_token, current_token,
- )
+ rows = yield self.update_function(from_token, current_token)
updates = [(row[0], row[1:]) for row in rows]
@@ -230,6 +225,7 @@ class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
"""
+
NAME = "backfill"
ROW_TYPE = BackfillStreamRow
@@ -286,6 +282,7 @@ class ReceiptsStream(Stream):
class PushRulesStream(Stream):
"""A user has changed their push rules
"""
+
NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow
@@ -306,6 +303,7 @@ class PushRulesStream(Stream):
class PushersStream(Stream):
"""A user has added/changed/removed a pusher
"""
+
NAME = "pushers"
ROW_TYPE = PushersStreamRow
@@ -322,6 +320,7 @@ class CachesStream(Stream):
"""A cache was invalidated on the master and no other stream would invalidate
the cache on the workers
"""
+
NAME = "caches"
ROW_TYPE = CachesStreamRow
@@ -337,6 +336,7 @@ class CachesStream(Stream):
class PublicRoomsStream(Stream):
"""The public rooms list changed
"""
+
NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow
@@ -352,6 +352,7 @@ class PublicRoomsStream(Stream):
class DeviceListsStream(Stream):
"""Someone added/changed/removed a device
"""
+
NAME = "device_lists"
_LIMITED = False
ROW_TYPE = DeviceListsStreamRow
@@ -368,6 +369,7 @@ class DeviceListsStream(Stream):
class ToDeviceStream(Stream):
"""New to_device messages for a client
"""
+
NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow
@@ -383,6 +385,7 @@ class ToDeviceStream(Stream):
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
+
NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow
@@ -398,6 +401,7 @@ class TagAccountDataStream(Stream):
class AccountDataStream(Stream):
"""Global or per room account data was changed
"""
+
NAME = "account_data"
ROW_TYPE = AccountDataStreamRow
@@ -416,7 +420,7 @@ class AccountDataStream(Stream):
results = list(room_results)
results.extend(
- (stream_id, user_id, None, account_data_type, content,)
+ (stream_id, user_id, None, account_data_type, content)
for stream_id, user_id, account_data_type, content in global_results
)
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f1290d022a..3d0694bb11 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -52,6 +52,7 @@ data part are:
@attr.s(slots=True, frozen=True)
class EventsStreamRow(object):
"""A parsed row from the events replication stream"""
+
type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
data = attr.ib() # BaseEventsStreamRow
@@ -80,11 +81,11 @@ class BaseEventsStreamRow(object):
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"
- event_id = attr.ib() # str
- room_id = attr.ib() # str
- type = attr.ib() # str
- state_key = attr.ib() # str, optional
- redacts = attr.ib() # str, optional
+ event_id = attr.ib() # str
+ room_id = attr.ib() # str
+ type = attr.ib() # str
+ state_key = attr.ib() # str, optional
+ redacts = attr.ib() # str, optional
relates_to = attr.ib() # str, optional
@@ -92,24 +93,21 @@ class EventsStreamEventRow(BaseEventsStreamRow):
class EventsStreamCurrentStateRow(BaseEventsStreamRow):
TypeId = "state"
- room_id = attr.ib() # str
- type = attr.ib() # str
+ room_id = attr.ib() # str
+ type = attr.ib() # str
state_key = attr.ib() # str
- event_id = attr.ib() # str, optional
+ event_id = attr.ib() # str, optional
TypeToRow = {
- Row.TypeId: Row
- for Row in (
- EventsStreamEventRow,
- EventsStreamCurrentStateRow,
- )
+ Row.TypeId: Row for Row in (EventsStreamEventRow, EventsStreamCurrentStateRow)
}
class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
+
NAME = "events"
def __init__(self, hs):
@@ -121,19 +119,17 @@ class EventsStream(Stream):
@defer.inlineCallbacks
def update_function(self, from_token, current_token, limit=None):
event_rows = yield self._store.get_all_new_forward_event_rows(
- from_token, current_token, limit,
+ from_token, current_token, limit
)
event_updates = (
- (row[0], EventsStreamEventRow.TypeId, row[1:])
- for row in event_rows
+ (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
)
state_rows = yield self._store.get_all_updated_current_state_deltas(
from_token, current_token, limit
)
state_updates = (
- (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
- for row in state_rows
+ (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
)
all_updates = heapq.merge(event_updates, state_updates)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 9aa43aa8d2..dc2484109d 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -17,16 +17,20 @@ from collections import namedtuple
from ._base import Stream
-FederationStreamRow = namedtuple("FederationStreamRow", (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
-))
+FederationStreamRow = namedtuple(
+ "FederationStreamRow",
+ (
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
+ ),
+)
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
+
NAME = "federation"
ROW_TYPE = FederationStreamRow
|