diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d78c79e0f9..d751c9772b 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -26,7 +26,6 @@ from twisted.web.resource import NoResource
import synapse
import synapse.events
-from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.api.urls import (
CLIENT_API_PREFIX,
@@ -48,6 +47,7 @@ from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -81,11 +81,6 @@ from synapse.replication.tcp.streams import (
ToDeviceStream,
TypingStream,
)
-from synapse.replication.tcp.streams.events import (
- EventsStream,
- EventsStreamEventRow,
- EventsStreamRow,
-)
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -122,11 +117,13 @@ from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.censor_events import CensorEventsStore
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.presence import UserPresenceState
+from synapse.storage.data_stores.main.search import SearchWorkerStore
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
@@ -429,6 +426,7 @@ class GenericWorkerSlavedStore(
SlavedGroupServerStore,
SlavedAccountDataStore,
SlavedPusherStore,
+ CensorEventsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
@@ -442,6 +440,7 @@ class GenericWorkerSlavedStore(
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
+ SearchWorkerStore,
BaseSlavedStore,
):
def __init__(self, database, db_conn, hs):
@@ -559,6 +558,9 @@ class GenericWorkerServer(HomeServer):
if name in ["keys", "federation"]:
resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
+ if name == "replication":
+ resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+
root_resource = create_resource_tree(resources, NoResource())
_base.listen_tcp(
@@ -618,7 +620,7 @@ class GenericWorkerServer(HomeServer):
class GenericWorkerReplicationHandler(ReplicationDataHandler):
def __init__(self, hs):
- super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore())
+ super(GenericWorkerReplicationHandler, self).__init__(hs)
self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
@@ -644,30 +646,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
stream_name, token, rows
)
- if stream_name == EventsStream.NAME:
- # We shouldn't get multiple rows per token for events stream, so
- # we don't need to optimise this for multiple rows.
- for row in rows:
- if row.type != EventsStreamEventRow.TypeId:
- continue
- assert isinstance(row, EventsStreamRow)
-
- event = await self.store.get_event(
- row.data.event_id, allow_rejected=True
- )
- if event.rejected_reason:
- continue
-
- extra_users = ()
- if event.type == EventTypes.Member:
- extra_users = (event.state_key,)
- max_token = self.store.get_room_max_stream_ordering()
- self.notifier.on_new_room_event(
- event, token, max_token, extra_users
- )
-
- await self.pusher_pool.on_new_notifications(token, token)
- elif stream_name == PushRulesStream.NAME:
+ if stream_name == PushRulesStream.NAME:
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows]
)
|