diff options
author | Erik Johnston <erik@matrix.org> | 2020-05-14 14:01:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-14 14:01:39 +0100 |
commit | 4734a7bbe4d08d68c5f04dd76cd5bcfb4cd9b6be (patch) | |
tree | e01a3c3e134699ac16e402301906b9f62cdd263a /synapse/app | |
parent | Add `instance_map` config and route replication calls (#7495) (diff) | |
download | synapse-4734a7bbe4d08d68c5f04dd76cd5bcfb4cd9b6be.tar.xz |
Move EventStream handling into default ReplicationDataHandler (#7493)
This is so that the logic can happen on both master and workers when we move event persistence out.
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/generic_worker.py | 33 |
1 files changed, 2 insertions, 31 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index bccb1140b2..2e3add7ac5 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -26,7 +26,6 @@ from twisted.web.resource import NoResource import synapse import synapse.events -from synapse.api.constants import EventTypes from synapse.api.errors import HttpResponseException, SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, @@ -81,11 +80,6 @@ from synapse.replication.tcp.streams import ( ToDeviceStream, TypingStream, ) -from synapse.replication.tcp.streams.events import ( - EventsStream, - EventsStreamEventRow, - EventsStreamRow, -) from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet @@ -633,7 +627,7 @@ class GenericWorkerServer(HomeServer): class GenericWorkerReplicationHandler(ReplicationDataHandler): def __init__(self, hs): - super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore()) + super(GenericWorkerReplicationHandler, self).__init__(hs) self.store = hs.get_datastore() self.typing_handler = hs.get_typing_handler() @@ -659,30 +653,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): stream_name, token, rows ) - if stream_name == EventsStream.NAME: - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - if row.type != EventsStreamEventRow.TypeId: - continue - assert isinstance(row, EventsStreamRow) - - event = await self.store.get_event( - row.data.event_id, allow_rejected=True - ) - if event.rejected_reason: - continue - - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users - ) - - await self.pusher_pool.on_new_notifications(token, token) - elif stream_name == PushRulesStream.NAME: + if stream_name == PushRulesStream.NAME: self.notifier.on_new_event( "push_rules_key", token, users=[row.user_id for row in rows] ) |