diff --git a/changelog.d/7116.misc b/changelog.d/7116.misc
new file mode 100644
index 0000000000..89d90bd49e
--- /dev/null
+++ b/changelog.d/7116.misc
@@ -0,0 +1 @@
+Convert `*StreamRow` classes to inner classes.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 136babe6ce..c8fd8909a4 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -804,7 +804,7 @@ class FederationSenderHandler(object):
async def _on_new_receipts(self, rows):
"""
Args:
- rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+ rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
new receipts to be processed
"""
for receipt in rows:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 876fb0e245..e1700ca8aa 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows):
Args:
transaction_queue (FederationSender)
- rows (list(synapse.replication.tcp.streams.FederationStreamRow))
+ rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
"""
# The federation stream contains a bunch of different types of
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index abf5c6c6a8..32d9514883 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -28,94 +28,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 500000
-BackfillStreamRow = namedtuple(
- "BackfillStreamRow",
- (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
- "relates_to", # str, optional
- ),
-)
-PresenceStreamRow = namedtuple(
- "PresenceStreamRow",
- (
- "user_id", # str
- "state", # str
- "last_active_ts", # int
- "last_federation_update_ts", # int
- "last_user_sync_ts", # int
- "status_msg", # str
- "currently_active", # bool
- ),
-)
-TypingStreamRow = namedtuple(
- "TypingStreamRow", ("room_id", "user_ids") # str # list(str)
-)
-ReceiptsStreamRow = namedtuple(
- "ReceiptsStreamRow",
- (
- "room_id", # str
- "receipt_type", # str
- "user_id", # str
- "event_id", # str
- "data", # dict
- ),
-)
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
-PushersStreamRow = namedtuple(
- "PushersStreamRow",
- ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
-)
-
-
-@attr.s
-class CachesStreamRow:
- """Stream to inform workers they should invalidate their cache.
-
- Attributes:
- cache_func: Name of the cached function.
- keys: The entry in the cache to invalidate. If None then will
- invalidate all.
- invalidation_ts: Timestamp of when the invalidation took place.
- """
-
- cache_func = attr.ib(type=str)
- keys = attr.ib(type=Optional[List[Any]])
- invalidation_ts = attr.ib(type=int)
-
-
-PublicRoomsStreamRow = namedtuple(
- "PublicRoomsStreamRow",
- (
- "room_id", # str
- "visibility", # str
- "appservice_id", # str, optional
- "network_id", # str, optional
- ),
-)
-
-
-@attr.s
-class DeviceListsStreamRow:
- entity = attr.ib(type=str)
-
-
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
-TagAccountDataStreamRow = namedtuple(
- "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
-)
-AccountDataStreamRow = namedtuple(
- "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
-)
-GroupsStreamRow = namedtuple(
- "GroupsStreamRow",
- ("group_id", "user_id", "type", "content"), # str # str # str # dict
-)
-UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
-
class Stream(object):
"""Base class for the streams.
@@ -234,6 +146,18 @@ class BackfillStream(Stream):
or it went from being an outlier to not.
"""
+ BackfillStreamRow = namedtuple(
+ "BackfillStreamRow",
+ (
+ "event_id", # str
+ "room_id", # str
+ "type", # str
+ "state_key", # str, optional
+ "redacts", # str, optional
+ "relates_to", # str, optional
+ ),
+ )
+
NAME = "backfill"
ROW_TYPE = BackfillStreamRow
@@ -246,6 +170,19 @@ class BackfillStream(Stream):
class PresenceStream(Stream):
+ PresenceStreamRow = namedtuple(
+ "PresenceStreamRow",
+ (
+ "user_id", # str
+ "state", # str
+ "last_active_ts", # int
+ "last_federation_update_ts", # int
+ "last_user_sync_ts", # int
+ "status_msg", # str
+ "currently_active", # bool
+ ),
+ )
+
NAME = "presence"
ROW_TYPE = PresenceStreamRow
@@ -260,6 +197,10 @@ class PresenceStream(Stream):
class TypingStream(Stream):
+ TypingStreamRow = namedtuple(
+ "TypingStreamRow", ("room_id", "user_ids") # str # list(str)
+ )
+
NAME = "typing"
ROW_TYPE = TypingStreamRow
@@ -273,6 +214,17 @@ class TypingStream(Stream):
class ReceiptsStream(Stream):
+ ReceiptsStreamRow = namedtuple(
+ "ReceiptsStreamRow",
+ (
+ "room_id", # str
+ "receipt_type", # str
+ "user_id", # str
+ "event_id", # str
+ "data", # dict
+ ),
+ )
+
NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow
@@ -289,6 +241,8 @@ class PushRulesStream(Stream):
"""A user has changed their push rules
"""
+ PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
+
NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow
@@ -309,6 +263,11 @@ class PushersStream(Stream):
"""A user has added/changed/removed a pusher
"""
+ PushersStreamRow = namedtuple(
+ "PushersStreamRow",
+ ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
+ )
+
NAME = "pushers"
ROW_TYPE = PushersStreamRow
@@ -326,6 +285,21 @@ class CachesStream(Stream):
the cache on the workers
"""
+ @attr.s
+ class CachesStreamRow:
+ """Stream to inform workers they should invalidate their cache.
+
+ Attributes:
+ cache_func: Name of the cached function.
+ keys: The entry in the cache to invalidate. If None then will
+ invalidate all.
+ invalidation_ts: Timestamp of when the invalidation took place.
+ """
+
+ cache_func = attr.ib(type=str)
+ keys = attr.ib(type=Optional[List[Any]])
+ invalidation_ts = attr.ib(type=int)
+
NAME = "caches"
ROW_TYPE = CachesStreamRow
@@ -342,6 +316,16 @@ class PublicRoomsStream(Stream):
"""The public rooms list changed
"""
+ PublicRoomsStreamRow = namedtuple(
+ "PublicRoomsStreamRow",
+ (
+ "room_id", # str
+ "visibility", # str
+ "appservice_id", # str, optional
+ "network_id", # str, optional
+ ),
+ )
+
NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow
@@ -359,6 +343,10 @@ class DeviceListsStream(Stream):
told about a device update.
"""
+ @attr.s
+ class DeviceListsStreamRow:
+ entity = attr.ib(type=str)
+
NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow
@@ -375,6 +363,8 @@ class ToDeviceStream(Stream):
"""New to_device messages for a client
"""
+ ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
+
NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow
@@ -391,6 +381,10 @@ class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
+ TagAccountDataStreamRow = namedtuple(
+ "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
+ )
+
NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow
@@ -407,6 +401,10 @@ class AccountDataStream(Stream):
"""Global or per room account data was changed
"""
+ AccountDataStreamRow = namedtuple(
+ "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
+ )
+
NAME = "account_data"
ROW_TYPE = AccountDataStreamRow
@@ -432,6 +430,11 @@ class AccountDataStream(Stream):
class GroupServerStream(Stream):
+ GroupsStreamRow = namedtuple(
+ "GroupsStreamRow",
+ ("group_id", "user_id", "type", "content"), # str # str # str # dict
+ )
+
NAME = "groups"
ROW_TYPE = GroupsStreamRow
@@ -448,6 +451,8 @@ class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key
"""
+ UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
+
NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 615f3dc9ac..f5f9336430 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -17,20 +17,20 @@ from collections import namedtuple
from ._base import Stream
-FederationStreamRow = namedtuple(
- "FederationStreamRow",
- (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
- ),
-)
-
class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""
+ FederationStreamRow = namedtuple(
+ "FederationStreamRow",
+ (
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
+ ),
+ )
+
NAME = "federation"
ROW_TYPE = FederationStreamRow
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index d5a99f6caa..fa2493cad6 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.replication.tcp.streams._base import ReceiptsStreamRow
+from synapse.replication.tcp.streams._base import ReceiptsStream
from tests.replication.tcp.streams._base import BaseStreamTestCase
@@ -38,7 +38,7 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
rdata_rows = self.test_handler.received_rdata_rows
self.assertEqual(1, len(rdata_rows))
self.assertEqual(rdata_rows[0][0], "receipts")
- row = rdata_rows[0][2] # type: ReceiptsStreamRow
+ row = rdata_rows[0][2] # type: ReceiptsStream.ReceiptsStreamRow
self.assertEqual(ROOM_ID, row.room_id)
self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id)
|