summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-03-23 13:59:11 +0000
committerGitHub <noreply@github.com>2020-03-23 13:59:11 +0000
commita564b92d37625855940fe599c730a9958c33f973 (patch)
treebe8d131a98ca583005723016a47caab0241e6009
parentMerge branch 'master' into develop (diff)
downloadsynapse-a564b92d37625855940fe599c730a9958c33f973.tar.xz
Convert `*StreamRow` classes to inner classes (#7116)
This just helps keep the rows closer to their streams, so that it's easier to
see what the format of each stream is.
-rw-r--r--changelog.d/7116.misc1
-rw-r--r--synapse/app/generic_worker.py2
-rw-r--r--synapse/federation/send_queue.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py181
-rw-r--r--synapse/replication/tcp/streams/federation.py16
-rw-r--r--tests/replication/tcp/streams/test_receipts.py4
6 files changed, 106 insertions, 100 deletions
diff --git a/changelog.d/7116.misc b/changelog.d/7116.misc
new file mode 100644
index 0000000000..89d90bd49e
--- /dev/null
+++ b/changelog.d/7116.misc
@@ -0,0 +1 @@
+Convert `*StreamRow` classes to inner classes.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 136babe6ce..c8fd8909a4 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -804,7 +804,7 @@ class FederationSenderHandler(object):
     async def _on_new_receipts(self, rows):
         """
         Args:
-            rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+            rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
                 new receipts to be processed
         """
         for receipt in rows:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 876fb0e245..e1700ca8aa 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows):
 
     Args:
         transaction_queue (FederationSender)
-        rows (list(synapse.replication.tcp.streams.FederationStreamRow))
+        rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
     """
 
     # The federation stream contains a bunch of different types of
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index abf5c6c6a8..32d9514883 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -28,94 +28,6 @@ logger = logging.getLogger(__name__)
 
 MAX_EVENTS_BEHIND = 500000
 
-BackfillStreamRow = namedtuple(
-    "BackfillStreamRow",
-    (
-        "event_id",  # str
-        "room_id",  # str
-        "type",  # str
-        "state_key",  # str, optional
-        "redacts",  # str, optional
-        "relates_to",  # str, optional
-    ),
-)
-PresenceStreamRow = namedtuple(
-    "PresenceStreamRow",
-    (
-        "user_id",  # str
-        "state",  # str
-        "last_active_ts",  # int
-        "last_federation_update_ts",  # int
-        "last_user_sync_ts",  # int
-        "status_msg",  # str
-        "currently_active",  # bool
-    ),
-)
-TypingStreamRow = namedtuple(
-    "TypingStreamRow", ("room_id", "user_ids")  # str  # list(str)
-)
-ReceiptsStreamRow = namedtuple(
-    "ReceiptsStreamRow",
-    (
-        "room_id",  # str
-        "receipt_type",  # str
-        "user_id",  # str
-        "event_id",  # str
-        "data",  # dict
-    ),
-)
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))  # str
-PushersStreamRow = namedtuple(
-    "PushersStreamRow",
-    ("user_id", "app_id", "pushkey", "deleted"),  # str  # str  # str  # bool
-)
-
-
-@attr.s
-class CachesStreamRow:
-    """Stream to inform workers they should invalidate their cache.
-
-    Attributes:
-        cache_func: Name of the cached function.
-        keys: The entry in the cache to invalidate. If None then will
-            invalidate all.
-        invalidation_ts: Timestamp of when the invalidation took place.
-    """
-
-    cache_func = attr.ib(type=str)
-    keys = attr.ib(type=Optional[List[Any]])
-    invalidation_ts = attr.ib(type=int)
-
-
-PublicRoomsStreamRow = namedtuple(
-    "PublicRoomsStreamRow",
-    (
-        "room_id",  # str
-        "visibility",  # str
-        "appservice_id",  # str, optional
-        "network_id",  # str, optional
-    ),
-)
-
-
-@attr.s
-class DeviceListsStreamRow:
-    entity = attr.ib(type=str)
-
-
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))  # str
-TagAccountDataStreamRow = namedtuple(
-    "TagAccountDataStreamRow", ("user_id", "room_id", "data")  # str  # str  # dict
-)
-AccountDataStreamRow = namedtuple(
-    "AccountDataStream", ("user_id", "room_id", "data_type")  # str  # str  # str
-)
-GroupsStreamRow = namedtuple(
-    "GroupsStreamRow",
-    ("group_id", "user_id", "type", "content"),  # str  # str  # str  # dict
-)
-UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id"))  # str
-
 
 class Stream(object):
     """Base class for the streams.
@@ -234,6 +146,18 @@ class BackfillStream(Stream):
     or it went from being an outlier to not.
     """
 
+    BackfillStreamRow = namedtuple(
+        "BackfillStreamRow",
+        (
+            "event_id",  # str
+            "room_id",  # str
+            "type",  # str
+            "state_key",  # str, optional
+            "redacts",  # str, optional
+            "relates_to",  # str, optional
+        ),
+    )
+
     NAME = "backfill"
     ROW_TYPE = BackfillStreamRow
 
@@ -246,6 +170,19 @@ class BackfillStream(Stream):
 
 
 class PresenceStream(Stream):
+    PresenceStreamRow = namedtuple(
+        "PresenceStreamRow",
+        (
+            "user_id",  # str
+            "state",  # str
+            "last_active_ts",  # int
+            "last_federation_update_ts",  # int
+            "last_user_sync_ts",  # int
+            "status_msg",  # str
+            "currently_active",  # bool
+        ),
+    )
+
     NAME = "presence"
     ROW_TYPE = PresenceStreamRow
 
@@ -260,6 +197,10 @@ class PresenceStream(Stream):
 
 
 class TypingStream(Stream):
+    TypingStreamRow = namedtuple(
+        "TypingStreamRow", ("room_id", "user_ids")  # str  # list(str)
+    )
+
     NAME = "typing"
     ROW_TYPE = TypingStreamRow
 
@@ -273,6 +214,17 @@ class TypingStream(Stream):
 
 
 class ReceiptsStream(Stream):
+    ReceiptsStreamRow = namedtuple(
+        "ReceiptsStreamRow",
+        (
+            "room_id",  # str
+            "receipt_type",  # str
+            "user_id",  # str
+            "event_id",  # str
+            "data",  # dict
+        ),
+    )
+
     NAME = "receipts"
     ROW_TYPE = ReceiptsStreamRow
 
@@ -289,6 +241,8 @@ class PushRulesStream(Stream):
     """A user has changed their push rules
     """
 
+    PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))  # str
+
     NAME = "push_rules"
     ROW_TYPE = PushRulesStreamRow
 
@@ -309,6 +263,11 @@ class PushersStream(Stream):
     """A user has added/changed/removed a pusher
     """
 
+    PushersStreamRow = namedtuple(
+        "PushersStreamRow",
+        ("user_id", "app_id", "pushkey", "deleted"),  # str  # str  # str  # bool
+    )
+
     NAME = "pushers"
     ROW_TYPE = PushersStreamRow
 
@@ -326,6 +285,21 @@ class CachesStream(Stream):
     the cache on the workers
     """
 
+    @attr.s
+    class CachesStreamRow:
+        """Stream to inform workers they should invalidate their cache.
+
+        Attributes:
+            cache_func: Name of the cached function.
+            keys: The entry in the cache to invalidate. If None then will
+                invalidate all.
+            invalidation_ts: Timestamp of when the invalidation took place.
+        """
+
+        cache_func = attr.ib(type=str)
+        keys = attr.ib(type=Optional[List[Any]])
+        invalidation_ts = attr.ib(type=int)
+
     NAME = "caches"
     ROW_TYPE = CachesStreamRow
 
@@ -342,6 +316,16 @@ class PublicRoomsStream(Stream):
     """The public rooms list changed
     """
 
+    PublicRoomsStreamRow = namedtuple(
+        "PublicRoomsStreamRow",
+        (
+            "room_id",  # str
+            "visibility",  # str
+            "appservice_id",  # str, optional
+            "network_id",  # str, optional
+        ),
+    )
+
     NAME = "public_rooms"
     ROW_TYPE = PublicRoomsStreamRow
 
@@ -359,6 +343,10 @@ class DeviceListsStream(Stream):
     told about a device update.
     """
 
+    @attr.s
+    class DeviceListsStreamRow:
+        entity = attr.ib(type=str)
+
     NAME = "device_lists"
     ROW_TYPE = DeviceListsStreamRow
 
@@ -375,6 +363,8 @@ class ToDeviceStream(Stream):
     """New to_device messages for a client
     """
 
+    ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))  # str
+
     NAME = "to_device"
     ROW_TYPE = ToDeviceStreamRow
 
@@ -391,6 +381,10 @@ class TagAccountDataStream(Stream):
     """Someone added/removed a tag for a room
     """
 
+    TagAccountDataStreamRow = namedtuple(
+        "TagAccountDataStreamRow", ("user_id", "room_id", "data")  # str  # str  # dict
+    )
+
     NAME = "tag_account_data"
     ROW_TYPE = TagAccountDataStreamRow
 
@@ -407,6 +401,10 @@ class AccountDataStream(Stream):
     """Global or per room account data was changed
     """
 
+    AccountDataStreamRow = namedtuple(
+        "AccountDataStream", ("user_id", "room_id", "data_type")  # str  # str  # str
+    )
+
     NAME = "account_data"
     ROW_TYPE = AccountDataStreamRow
 
@@ -432,6 +430,11 @@ class AccountDataStream(Stream):
 
 
 class GroupServerStream(Stream):
+    GroupsStreamRow = namedtuple(
+        "GroupsStreamRow",
+        ("group_id", "user_id", "type", "content"),  # str  # str  # str  # dict
+    )
+
     NAME = "groups"
     ROW_TYPE = GroupsStreamRow
 
@@ -448,6 +451,8 @@ class UserSignatureStream(Stream):
     """A user has signed their own device with their user-signing key
     """
 
+    UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id"))  # str
+
     NAME = "user_signature"
     ROW_TYPE = UserSignatureStreamRow
 
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 615f3dc9ac..f5f9336430 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -17,20 +17,20 @@ from collections import namedtuple
 
 from ._base import Stream
 
-FederationStreamRow = namedtuple(
-    "FederationStreamRow",
-    (
-        "type",  # str, the type of data as defined in the BaseFederationRows
-        "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
-    ),
-)
-
 
 class FederationStream(Stream):
     """Data to be sent over federation. Only available when master has federation
     sending disabled.
     """
 
+    FederationStreamRow = namedtuple(
+        "FederationStreamRow",
+        (
+            "type",  # str, the type of data as defined in the BaseFederationRows
+            "data",  # dict, serialization of a federation.send_queue.BaseFederationRow
+        ),
+    )
+
     NAME = "federation"
     ROW_TYPE = FederationStreamRow
 
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index d5a99f6caa..fa2493cad6 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from synapse.replication.tcp.streams._base import ReceiptsStreamRow
+from synapse.replication.tcp.streams._base import ReceiptsStream
 
 from tests.replication.tcp.streams._base import BaseStreamTestCase
 
@@ -38,7 +38,7 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
         rdata_rows = self.test_handler.received_rdata_rows
         self.assertEqual(1, len(rdata_rows))
         self.assertEqual(rdata_rows[0][0], "receipts")
-        row = rdata_rows[0][2]  # type: ReceiptsStreamRow
+        row = rdata_rows[0][2]  # type: ReceiptsStream.ReceiptsStreamRow
         self.assertEqual(ROOM_ID, row.room_id)
         self.assertEqual("m.read", row.receipt_type)
         self.assertEqual(USER_ID, row.user_id)