diff --git a/changelog.d/7117.bugfix b/changelog.d/7117.bugfix
new file mode 100644
index 0000000000..1896d7ad49
--- /dev/null
+++ b/changelog.d/7117.bugfix
@@ -0,0 +1 @@
+Fix a bug which meant that groups updates were not correctly replicated between workers.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index cdc078cf11..136babe6ce 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -65,12 +65,23 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams._base import (
+from synapse.replication.tcp.streams import (
+ AccountDataStream,
DeviceListsStream,
+ GroupServerStream,
+ PresenceStream,
+ PushersStream,
+ PushRulesStream,
ReceiptsStream,
+ TagAccountDataStream,
ToDeviceStream,
+ TypingStream,
+)
+from synapse.replication.tcp.streams.events import (
+ EventsStream,
+ EventsStreamEventRow,
+ EventsStreamRow,
)
-from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -626,7 +637,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows)
- if stream_name == "events":
+ if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
@@ -649,44 +660,44 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
)
await self.pusher_pool.on_new_notifications(token, token)
- elif stream_name == "push_rules":
+ elif stream_name == PushRulesStream.NAME:
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows]
)
- elif stream_name in ("account_data", "tag_account_data"):
+ elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows]
)
- elif stream_name == "receipts":
+ elif stream_name == ReceiptsStream.NAME:
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows]
)
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
- elif stream_name == "typing":
+ elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
- elif stream_name == "to_device":
+ elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
- elif stream_name == "device_lists":
+ elif stream_name == DeviceListsStream.NAME:
all_room_ids = set()
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
- elif stream_name == "presence":
+ elif stream_name == PresenceStream.NAME:
await self.presence_handler.process_replication_rows(token, rows)
- elif stream_name == "receipts":
+ elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
)
- elif stream_name == "pushers":
+ elif stream_name == PushersStream.NAME:
for row in rows:
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 5f52264e84..29199f5b46 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -24,27 +24,61 @@ Each stream is defined by the following information:
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""
-
-from . import _base, events, federation
+from synapse.replication.tcp.streams._base import (
+ AccountDataStream,
+ BackfillStream,
+ CachesStream,
+ DeviceListsStream,
+ GroupServerStream,
+ PresenceStream,
+ PublicRoomsStream,
+ PushersStream,
+ PushRulesStream,
+ ReceiptsStream,
+ TagAccountDataStream,
+ ToDeviceStream,
+ TypingStream,
+ UserSignatureStream,
+)
+from synapse.replication.tcp.streams.events import EventsStream
+from synapse.replication.tcp.streams.federation import FederationStream
STREAMS_MAP = {
stream.NAME: stream
for stream in (
- events.EventsStream,
- _base.BackfillStream,
- _base.PresenceStream,
- _base.TypingStream,
- _base.ReceiptsStream,
- _base.PushRulesStream,
- _base.PushersStream,
- _base.CachesStream,
- _base.PublicRoomsStream,
- _base.DeviceListsStream,
- _base.ToDeviceStream,
- federation.FederationStream,
- _base.TagAccountDataStream,
- _base.AccountDataStream,
- _base.GroupServerStream,
- _base.UserSignatureStream,
+ EventsStream,
+ BackfillStream,
+ PresenceStream,
+ TypingStream,
+ ReceiptsStream,
+ PushRulesStream,
+ PushersStream,
+ CachesStream,
+ PublicRoomsStream,
+ DeviceListsStream,
+ ToDeviceStream,
+ FederationStream,
+ TagAccountDataStream,
+ AccountDataStream,
+ GroupServerStream,
+ UserSignatureStream,
)
}
+
+__all__ = [
+ "STREAMS_MAP",
+ "BackfillStream",
+ "PresenceStream",
+ "TypingStream",
+ "ReceiptsStream",
+ "PushRulesStream",
+ "PushersStream",
+ "CachesStream",
+ "PublicRoomsStream",
+ "DeviceListsStream",
+ "ToDeviceStream",
+ "TagAccountDataStream",
+ "AccountDataStream",
+ "GroupServerStream",
+ "UserSignatureStream",
+]
|