From c165c1233b8ef244fadca97c7d465fdcf473d077 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 20 Mar 2020 16:24:22 +0100 Subject: Improve database configuration docs (#6988) Attempts to clarify the sample config for databases, and add some stuff about tcp keepalives to `postgres.md`. --- changelog.d/6988.doc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6988.doc (limited to 'changelog.d') diff --git a/changelog.d/6988.doc b/changelog.d/6988.doc new file mode 100644 index 0000000000..b6f71bb966 --- /dev/null +++ b/changelog.d/6988.doc @@ -0,0 +1 @@ +Improve the documentation for database configuration. -- cgit 1.4.1 From 477c4f5b1c2c7733d4b2cf578dc9aa8e048011b0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 20 Mar 2020 16:22:47 -0400 Subject: Clean-up some auth/login REST code (#7115) --- changelog.d/7115.misc | 1 + synapse/rest/client/v1/login.py | 8 ------ synapse/rest/client/v2_alpha/auth.py | 53 ++++++++++++++---------------------- 3 files changed, 21 insertions(+), 41 deletions(-) create mode 100644 changelog.d/7115.misc (limited to 'changelog.d') diff --git a/changelog.d/7115.misc b/changelog.d/7115.misc new file mode 100644 index 0000000000..7d4a011e3e --- /dev/null +++ b/changelog.d/7115.misc @@ -0,0 +1 @@ +De-duplicate / remove unused REST code for login and auth. diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index d0d4999795..31551524f8 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -28,7 +28,6 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) -from synapse.push.mailer import load_jinja2_templates from synapse.rest.client.v2_alpha._base import client_patterns from synapse.rest.well_known import WellKnownBuilder from synapse.types import UserID, map_username_to_mxid_localpart @@ -548,13 +547,6 @@ class SSOAuthHandler(object): self._registration_handler = hs.get_registration_handler() self._macaroon_gen = hs.get_macaroon_generator() - # Load the redirect page HTML template - self._template = load_jinja2_templates( - hs.config.sso_redirect_confirm_template_dir, ["sso_redirect_confirm.html"], - )[0] - - self._server_name = hs.config.server_name - # cast to tuple for use with str.startswith self._whitelisted_sso_clients = tuple(hs.config.sso_client_whitelist) diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index 50e080673b..85cf5a14c6 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -142,14 +142,6 @@ class AuthRestServlet(RestServlet): % (CLIENT_API_PREFIX, LoginType.RECAPTCHA), "sitekey": self.hs.config.recaptcha_public_key, } - html_bytes = html.encode("utf8") - request.setResponseCode(200) - request.setHeader(b"Content-Type", b"text/html; charset=utf-8") - request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) - - request.write(html_bytes) - finish_request(request) - return None elif stagetype == LoginType.TERMS: html = TERMS_TEMPLATE % { "session": session, @@ -158,17 +150,19 @@ class AuthRestServlet(RestServlet): "myurl": "%s/r0/auth/%s/fallback/web" % (CLIENT_API_PREFIX, LoginType.TERMS), } - html_bytes = html.encode("utf8") - request.setResponseCode(200) - request.setHeader(b"Content-Type", b"text/html; charset=utf-8") - request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) - - request.write(html_bytes) - finish_request(request) - return None else: raise SynapseError(404, "Unknown auth stage type") + # Render the HTML and return. + html_bytes = html.encode("utf8") + request.setResponseCode(200) + request.setHeader(b"Content-Type", b"text/html; charset=utf-8") + request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) + + request.write(html_bytes) + finish_request(request) + return None + async def on_POST(self, request, stagetype): session = parse_string(request, "session") @@ -196,15 +190,6 @@ class AuthRestServlet(RestServlet): % (CLIENT_API_PREFIX, LoginType.RECAPTCHA), "sitekey": self.hs.config.recaptcha_public_key, } - html_bytes = html.encode("utf8") - request.setResponseCode(200) - request.setHeader(b"Content-Type", b"text/html; charset=utf-8") - request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) - - request.write(html_bytes) - finish_request(request) - - return None elif stagetype == LoginType.TERMS: authdict = {"session": session} @@ -225,17 +210,19 @@ class AuthRestServlet(RestServlet): "myurl": "%s/r0/auth/%s/fallback/web" % (CLIENT_API_PREFIX, LoginType.TERMS), } - html_bytes = html.encode("utf8") - request.setResponseCode(200) - request.setHeader(b"Content-Type", b"text/html; charset=utf-8") - request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) - - request.write(html_bytes) - finish_request(request) - return None else: raise SynapseError(404, "Unknown auth stage type") + # Render the HTML and return. + html_bytes = html.encode("utf8") + request.setResponseCode(200) + request.setHeader(b"Content-Type", b"text/html; charset=utf-8") + request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) + + request.write(html_bytes) + finish_request(request) + return None + def on_OPTIONS(self, _): return 200, {} -- cgit 1.4.1 From 96071eea8f5e18282c07da3a61e4b3431f694cc5 Mon Sep 17 00:00:00 2001 From: Dionysis Grigoropoulos Date: Mon, 23 Mar 2020 11:48:28 +0200 Subject: Set Referrer-Policy to no-referrer for media (#7009) --- changelog.d/7009.feature | 1 + synapse/rest/media/v1/download_resource.py | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 changelog.d/7009.feature (limited to 'changelog.d') diff --git a/changelog.d/7009.feature b/changelog.d/7009.feature new file mode 100644 index 0000000000..cd2705d5ba --- /dev/null +++ b/changelog.d/7009.feature @@ -0,0 +1 @@ +Set `Referrer-Policy` header to `no-referrer` on media downloads. diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 66a01559e1..24d3ae5bbc 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -50,6 +50,9 @@ class DownloadResource(DirectServeResource): b" media-src 'self';" b" object-src 'self';", ) + request.setHeader( + b"Referrer-Policy", b"no-referrer", + ) server_name, media_id, name = parse_media_id(request) if server_name == self.server_name: await self.media_repo.get_local_media(request, media_id, name) -- cgit 1.4.1 From b3cee0ce670ada582b2a4b36c377f160c7ee1d09 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 23 Mar 2020 11:39:36 +0000 Subject: Fix processing of `groups` stream, and use symbolic names for streams (#7117) `groups` != `receipts` Introduced in #6964 --- changelog.d/7117.bugfix | 1 + synapse/app/generic_worker.py | 35 ++++++++++----- synapse/replication/tcp/streams/__init__.py | 70 +++++++++++++++++++++-------- 3 files changed, 76 insertions(+), 30 deletions(-) create mode 100644 changelog.d/7117.bugfix (limited to 'changelog.d') diff --git a/changelog.d/7117.bugfix b/changelog.d/7117.bugfix new file mode 100644 index 0000000000..1896d7ad49 --- /dev/null +++ b/changelog.d/7117.bugfix @@ -0,0 +1 @@ +Fix a bug which meant that groups updates were not correctly replicated between workers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index cdc078cf11..136babe6ce 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -65,12 +65,23 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams._base import ( +from synapse.replication.tcp.streams import ( + AccountDataStream, DeviceListsStream, + GroupServerStream, + PresenceStream, + PushersStream, + PushRulesStream, ReceiptsStream, + TagAccountDataStream, ToDeviceStream, + TypingStream, +) +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamEventRow, + EventsStreamRow, ) -from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet @@ -626,7 +637,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): if self.send_handler: self.send_handler.process_replication_rows(stream_name, token, rows) - if stream_name == "events": + if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: @@ -649,44 +660,44 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): ) await self.pusher_pool.on_new_notifications(token, token) - elif stream_name == "push_rules": + elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( "push_rules_key", token, users=[row.user_id for row in rows] ) - elif stream_name in ("account_data", "tag_account_data"): + elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( "account_data_key", token, users=[row.user_id for row in rows] ) - elif stream_name == "receipts": + elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( "receipt_key", token, rooms=[row.room_id for row in rows] ) await self.pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} ) - elif stream_name == "typing": + elif stream_name == TypingStream.NAME: self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows] ) - elif stream_name == "to_device": + elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: self.notifier.on_new_event("to_device_key", token, users=entities) - elif stream_name == "device_lists": + elif stream_name == DeviceListsStream.NAME: all_room_ids = set() for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) - elif stream_name == "presence": + elif stream_name == PresenceStream.NAME: await self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": + elif stream_name == GroupServerStream.NAME: self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] ) - elif stream_name == "pushers": + elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: self.stop_pusher(row.user_id, row.app_id, row.pushkey) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5f52264e84..29199f5b46 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -24,27 +24,61 @@ Each stream is defined by the following information: current_token: The function that returns the current token for the stream update_function: The function that returns a list of updates between two tokens """ - -from . import _base, events, federation +from synapse.replication.tcp.streams._base import ( + AccountDataStream, + BackfillStream, + CachesStream, + DeviceListsStream, + GroupServerStream, + PresenceStream, + PublicRoomsStream, + PushersStream, + PushRulesStream, + ReceiptsStream, + TagAccountDataStream, + ToDeviceStream, + TypingStream, + UserSignatureStream, +) +from synapse.replication.tcp.streams.events import EventsStream +from synapse.replication.tcp.streams.federation import FederationStream STREAMS_MAP = { stream.NAME: stream for stream in ( - events.EventsStream, - _base.BackfillStream, - _base.PresenceStream, - _base.TypingStream, - _base.ReceiptsStream, - _base.PushRulesStream, - _base.PushersStream, - _base.CachesStream, - _base.PublicRoomsStream, - _base.DeviceListsStream, - _base.ToDeviceStream, - federation.FederationStream, - _base.TagAccountDataStream, - _base.AccountDataStream, - _base.GroupServerStream, - _base.UserSignatureStream, + EventsStream, + BackfillStream, + PresenceStream, + TypingStream, + ReceiptsStream, + PushRulesStream, + PushersStream, + CachesStream, + PublicRoomsStream, + DeviceListsStream, + ToDeviceStream, + FederationStream, + TagAccountDataStream, + AccountDataStream, + GroupServerStream, + UserSignatureStream, ) } + +__all__ = [ + "STREAMS_MAP", + "BackfillStream", + "PresenceStream", + "TypingStream", + "ReceiptsStream", + "PushRulesStream", + "PushersStream", + "CachesStream", + "PublicRoomsStream", + "DeviceListsStream", + "ToDeviceStream", + "TagAccountDataStream", + "AccountDataStream", + "GroupServerStream", + "UserSignatureStream", +] -- cgit 1.4.1 From a564b92d37625855940fe599c730a9958c33f973 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 23 Mar 2020 13:59:11 +0000 Subject: Convert `*StreamRow` classes to inner classes (#7116) This just helps keep the rows closer to their streams, so that it's easier to see what the format of each stream is. --- changelog.d/7116.misc | 1 + synapse/app/generic_worker.py | 2 +- synapse/federation/send_queue.py | 2 +- synapse/replication/tcp/streams/_base.py | 181 +++++++++++++------------ synapse/replication/tcp/streams/federation.py | 16 +-- tests/replication/tcp/streams/test_receipts.py | 4 +- 6 files changed, 106 insertions(+), 100 deletions(-) create mode 100644 changelog.d/7116.misc (limited to 'changelog.d') diff --git a/changelog.d/7116.misc b/changelog.d/7116.misc new file mode 100644 index 0000000000..89d90bd49e --- /dev/null +++ b/changelog.d/7116.misc @@ -0,0 +1 @@ +Convert `*StreamRow` classes to inner classes. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 136babe6ce..c8fd8909a4 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -804,7 +804,7 @@ class FederationSenderHandler(object): async def _on_new_receipts(self, rows): """ Args: - rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): + rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]): new receipts to be processed """ for receipt in rows: diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 876fb0e245..e1700ca8aa 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows): Args: transaction_queue (FederationSender) - rows (list(synapse.replication.tcp.streams.FederationStreamRow)) + rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow)) """ # The federation stream contains a bunch of different types of diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index abf5c6c6a8..32d9514883 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -28,94 +28,6 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 500000 -BackfillStreamRow = namedtuple( - "BackfillStreamRow", - ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional - "relates_to", # str, optional - ), -) -PresenceStreamRow = namedtuple( - "PresenceStreamRow", - ( - "user_id", # str - "state", # str - "last_active_ts", # int - "last_federation_update_ts", # int - "last_user_sync_ts", # int - "status_msg", # str - "currently_active", # bool - ), -) -TypingStreamRow = namedtuple( - "TypingStreamRow", ("room_id", "user_ids") # str # list(str) -) -ReceiptsStreamRow = namedtuple( - "ReceiptsStreamRow", - ( - "room_id", # str - "receipt_type", # str - "user_id", # str - "event_id", # str - "data", # dict - ), -) -PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str -PushersStreamRow = namedtuple( - "PushersStreamRow", - ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool -) - - -@attr.s -class CachesStreamRow: - """Stream to inform workers they should invalidate their cache. - - Attributes: - cache_func: Name of the cached function. - keys: The entry in the cache to invalidate. If None then will - invalidate all. - invalidation_ts: Timestamp of when the invalidation took place. - """ - - cache_func = attr.ib(type=str) - keys = attr.ib(type=Optional[List[Any]]) - invalidation_ts = attr.ib(type=int) - - -PublicRoomsStreamRow = namedtuple( - "PublicRoomsStreamRow", - ( - "room_id", # str - "visibility", # str - "appservice_id", # str, optional - "network_id", # str, optional - ), -) - - -@attr.s -class DeviceListsStreamRow: - entity = attr.ib(type=str) - - -ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str -TagAccountDataStreamRow = namedtuple( - "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict -) -AccountDataStreamRow = namedtuple( - "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str -) -GroupsStreamRow = namedtuple( - "GroupsStreamRow", - ("group_id", "user_id", "type", "content"), # str # str # str # dict -) -UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str - class Stream(object): """Base class for the streams. @@ -234,6 +146,18 @@ class BackfillStream(Stream): or it went from being an outlier to not. """ + BackfillStreamRow = namedtuple( + "BackfillStreamRow", + ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional + "relates_to", # str, optional + ), + ) + NAME = "backfill" ROW_TYPE = BackfillStreamRow @@ -246,6 +170,19 @@ class BackfillStream(Stream): class PresenceStream(Stream): + PresenceStreamRow = namedtuple( + "PresenceStreamRow", + ( + "user_id", # str + "state", # str + "last_active_ts", # int + "last_federation_update_ts", # int + "last_user_sync_ts", # int + "status_msg", # str + "currently_active", # bool + ), + ) + NAME = "presence" ROW_TYPE = PresenceStreamRow @@ -260,6 +197,10 @@ class PresenceStream(Stream): class TypingStream(Stream): + TypingStreamRow = namedtuple( + "TypingStreamRow", ("room_id", "user_ids") # str # list(str) + ) + NAME = "typing" ROW_TYPE = TypingStreamRow @@ -273,6 +214,17 @@ class TypingStream(Stream): class ReceiptsStream(Stream): + ReceiptsStreamRow = namedtuple( + "ReceiptsStreamRow", + ( + "room_id", # str + "receipt_type", # str + "user_id", # str + "event_id", # str + "data", # dict + ), + ) + NAME = "receipts" ROW_TYPE = ReceiptsStreamRow @@ -289,6 +241,8 @@ class PushRulesStream(Stream): """A user has changed their push rules """ + PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str + NAME = "push_rules" ROW_TYPE = PushRulesStreamRow @@ -309,6 +263,11 @@ class PushersStream(Stream): """A user has added/changed/removed a pusher """ + PushersStreamRow = namedtuple( + "PushersStreamRow", + ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool + ) + NAME = "pushers" ROW_TYPE = PushersStreamRow @@ -326,6 +285,21 @@ class CachesStream(Stream): the cache on the workers """ + @attr.s + class CachesStreamRow: + """Stream to inform workers they should invalidate their cache. + + Attributes: + cache_func: Name of the cached function. + keys: The entry in the cache to invalidate. If None then will + invalidate all. + invalidation_ts: Timestamp of when the invalidation took place. + """ + + cache_func = attr.ib(type=str) + keys = attr.ib(type=Optional[List[Any]]) + invalidation_ts = attr.ib(type=int) + NAME = "caches" ROW_TYPE = CachesStreamRow @@ -342,6 +316,16 @@ class PublicRoomsStream(Stream): """The public rooms list changed """ + PublicRoomsStreamRow = namedtuple( + "PublicRoomsStreamRow", + ( + "room_id", # str + "visibility", # str + "appservice_id", # str, optional + "network_id", # str, optional + ), + ) + NAME = "public_rooms" ROW_TYPE = PublicRoomsStreamRow @@ -359,6 +343,10 @@ class DeviceListsStream(Stream): told about a device update. """ + @attr.s + class DeviceListsStreamRow: + entity = attr.ib(type=str) + NAME = "device_lists" ROW_TYPE = DeviceListsStreamRow @@ -375,6 +363,8 @@ class ToDeviceStream(Stream): """New to_device messages for a client """ + ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str + NAME = "to_device" ROW_TYPE = ToDeviceStreamRow @@ -391,6 +381,10 @@ class TagAccountDataStream(Stream): """Someone added/removed a tag for a room """ + TagAccountDataStreamRow = namedtuple( + "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict + ) + NAME = "tag_account_data" ROW_TYPE = TagAccountDataStreamRow @@ -407,6 +401,10 @@ class AccountDataStream(Stream): """Global or per room account data was changed """ + AccountDataStreamRow = namedtuple( + "AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str + ) + NAME = "account_data" ROW_TYPE = AccountDataStreamRow @@ -432,6 +430,11 @@ class AccountDataStream(Stream): class GroupServerStream(Stream): + GroupsStreamRow = namedtuple( + "GroupsStreamRow", + ("group_id", "user_id", "type", "content"), # str # str # str # dict + ) + NAME = "groups" ROW_TYPE = GroupsStreamRow @@ -448,6 +451,8 @@ class UserSignatureStream(Stream): """A user has signed their own device with their user-signing key """ + UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str + NAME = "user_signature" ROW_TYPE = UserSignatureStreamRow diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 615f3dc9ac..f5f9336430 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -17,20 +17,20 @@ from collections import namedtuple from ._base import Stream -FederationStreamRow = namedtuple( - "FederationStreamRow", - ( - "type", # str, the type of data as defined in the BaseFederationRows - "data", # dict, serialization of a federation.send_queue.BaseFederationRow - ), -) - class FederationStream(Stream): """Data to be sent over federation. Only available when master has federation sending disabled. """ + FederationStreamRow = namedtuple( + "FederationStreamRow", + ( + "type", # str, the type of data as defined in the BaseFederationRows + "data", # dict, serialization of a federation.send_queue.BaseFederationRow + ), + ) + NAME = "federation" ROW_TYPE = FederationStreamRow diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index d5a99f6caa..fa2493cad6 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.replication.tcp.streams._base import ReceiptsStreamRow +from synapse.replication.tcp.streams._base import ReceiptsStream from tests.replication.tcp.streams._base import BaseStreamTestCase @@ -38,7 +38,7 @@ class ReceiptsStreamTestCase(BaseStreamTestCase): rdata_rows = self.test_handler.received_rdata_rows self.assertEqual(1, len(rdata_rows)) self.assertEqual(rdata_rows[0][0], "receipts") - row = rdata_rows[0][2] # type: ReceiptsStreamRow + row = rdata_rows[0][2] # type: ReceiptsStream.ReceiptsStreamRow self.assertEqual(ROOM_ID, row.room_id) self.assertEqual("m.read", row.receipt_type) self.assertEqual(USER_ID, row.user_id) -- cgit 1.4.1