diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-05-17 17:09:25 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-05-17 17:09:25 +0100 |
commit | 8e44e34ed51591fb4c170ca6991610ae4e5b4b62 (patch) | |
tree | 6d48ed2e4327c592b89eaca1119fb15062ef4d2a | |
parent | Merge branch 'markjh/liberate_typing_handler' into markjh/synchrotron (diff) | |
download | synapse-8e44e34ed51591fb4c170ca6991610ae4e5b4b62.tar.xz |
Split out the /sync handler to a separate process
-rw-r--r-- | synapse/app/synchrotron.py | 20 | ||||
-rw-r--r-- | synapse/replication/slave/storage/account_data.py | 39 | ||||
-rw-r--r-- | synapse/replication/slave/storage/appservice.py | 1 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 10 | ||||
-rw-r--r-- | synapse/replication/slave/storage/push_rule.py | 13 | ||||
-rw-r--r-- | synapse/replication/slave/storage/receipts.py | 11 | ||||
-rw-r--r-- | synapse/replication/slave/storage/registration.py | 2 |
7 files changed, 92 insertions, 4 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 865c7f34e7..a46edd3e7c 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -107,6 +107,14 @@ class SynchrotronSlavedStore( SlavedFilteringStore, ): def get_current_presence_token(self): + return 0 + + presence_stream_cache = () + + def get_presence_list_accepted(self, user_localpart): + return () + + def insert_client_ip(self, user, access_token, ip, user_agent): pass @@ -114,10 +122,21 @@ class SynchrotronPresence(object): def set_state(self, user, state): pass + def get_states(self, user_ids, as_event=False): + return {} + @contextlib.contextmanager def user_syncing(self, user, affect_presence): yield + def current_state_for_users(self, user_ids): + return {} + + +class SynchrotronTyping(object): + _latest_room_serial = 0 + _room_serials = () + class SynchrotronServer(HomeServer): def get_db_conn(self, run_new_connection=True): @@ -210,6 +229,7 @@ def setup(config_options): version_string=get_version_string("Synapse", synapse), database_engine=database_engine, presence_handler=SynchrotronPresence(), + typing_handler=SynchrotronTyping(), ) ss.setup() diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index f59b0eabbc..5a44d314a3 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -15,7 +15,10 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore from synapse.storage.account_data import AccountDataStore +from synapse.storage.tags import TagsStore +from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedAccountDataStore(BaseSlavedStore): @@ -25,6 +28,14 @@ class SlavedAccountDataStore(BaseSlavedStore): self._account_data_id_gen = SlavedIdTracker( db_conn, "account_data_max_stream_id", "stream_id", ) + self._account_data_stream_cache = StreamChangeCache( + "AccountDataAndTagsChangeCache", + self._account_data_id_gen.get_current_token(), + ) + + get_account_data_for_user = ( + AccountDataStore.__dict__["get_account_data_for_user"] + ) get_global_account_data_by_type_for_users = ( AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] @@ -34,6 +45,16 @@ class SlavedAccountDataStore(BaseSlavedStore): AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] ) + get_tags_for_user = TagsStore.__dict__["get_tags_for_user"] + + get_updated_tags = DataStore.get_updated_tags.__func__ + get_updated_account_data_for_user = ( + DataStore.get_updated_account_data_for_user.__func__ + ) + + def get_max_account_data_stream_id(self): + return self._account_data_id_gen.get_current_token() + def stream_positions(self): result = super(SlavedAccountDataStore, self).stream_positions() position = self._account_data_id_gen.get_current_token() @@ -47,15 +68,31 @@ class SlavedAccountDataStore(BaseSlavedStore): if stream: self._account_data_id_gen.advance(int(stream["position"])) for row in stream["rows"]: - user_id, data_type = row[1:3] + position, user_id, data_type = row[:3] self.get_global_account_data_by_type_for_user.invalidate( (data_type, user_id,) ) + self.get_account_data_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) stream = result.get("room_account_data") if stream: self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.get_account_data_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) stream = result.get("tag_account_data") if stream: self._account_data_id_gen.advance(int(stream["position"])) + for row in stream["rows"]: + position, user_id = row[:2] + self.get_tags_for_user.invalidate((user_id,)) + self._account_data_stream_cache.entity_has_changed( + user_id, position + ) diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index c0321ecf34..25792d9429 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -27,3 +27,4 @@ class SlavedApplicationServiceStore(BaseSlavedStore): ) get_app_service_by_token = DataStore.get_app_service_by_token.__func__ + get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index c0d741452d..c35192d0d1 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.state import StateStore +from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache import ujson as json @@ -87,6 +88,9 @@ class SlavedEventStore(BaseSlavedStore): _get_state_group_from_group = ( StateStore.__dict__["_get_state_group_from_group"] ) + get_recent_event_ids_for_room = ( + StreamStore.__dict__["get_recent_event_ids_for_room"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -111,8 +115,12 @@ class SlavedEventStore(BaseSlavedStore): get_events_around = DataStore.get_events_around.__func__ get_state_for_events = DataStore.get_state_for_events.__func__ get_state_groups = DataStore.get_state_groups.__func__ + get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ + get_room_events_stream_for_rooms = ( + DataStore.get_room_events_stream_for_rooms.__func__ + ) - _set_before_and_after = DataStore._set_before_and_after + _set_before_and_after = staticmethod(DataStore._set_before_and_after) _get_events = DataStore._get_events.__func__ _get_events_from_cache = DataStore._get_events_from_cache.__func__ diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 56796d9bd4..86fec353d3 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -15,7 +15,9 @@ from .events import SlavedEventStore from ._slaved_id_tracker import SlavedIdTracker +from synapse.storage import DataStore from synapse.storage.push_rule import PushRuleStore +from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedPushRuleStore(SlavedEventStore): @@ -24,8 +26,18 @@ class SlavedPushRuleStore(SlavedEventStore): self._push_rules_stream_id_gen = SlavedIdTracker( db_conn, "push_rules_stream", "stream_id", ) + self.push_rules_stream_cache = StreamChangeCache( + "PushRulesStreamChangeCache", + self._push_rules_stream_id_gen.get_current_token(), + ) get_push_rules_for_user = PushRuleStore.__dict__["get_push_rules_for_user"] + get_push_rules_enabled_for_user = ( + PushRuleStore.__dict__["get_push_rules_enabled_for_user"] + ) + have_push_rules_changed_for_user = ( + DataStore.have_push_rules_changed_for_user.__func__ + ) def get_push_rules_stream_token(self): return ( @@ -44,6 +56,7 @@ class SlavedPushRuleStore(SlavedEventStore): for row in stream["rows"]: user_id = row[1] self.get_push_rules_for_user.invalidate((user_id,)) + self.get_push_rules_enabled_for_user.invalidate((user_id,)) self._push_rules_stream_id_gen.advance(int(stream["position"])) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index ec007516d0..f88b931f8c 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -38,10 +38,20 @@ class SlavedReceiptsStore(BaseSlavedStore): ) get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"] + get_linearized_receipts_for_room = ( + ReceiptsStore.__dict__["get_linearized_receipts_for_room"] + ) + _get_linearized_receipts_for_rooms = ( + ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"] + ) get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__ get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__ + get_linearized_receipts_for_rooms = ( + DataStore.get_linearized_receipts_for_rooms.__func__ + ) + def stream_positions(self): result = super(SlavedReceiptsStore, self).stream_positions() result["receipts"] = self._receipts_id_gen.get_current_token() @@ -59,3 +69,4 @@ class SlavedReceiptsStore(BaseSlavedStore): def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) + self.get_linearized_receipts_for_room((room_id,)) diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index 8f39e4f7aa..e2d2e60d42 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -27,6 +27,4 @@ class SlavedRegistrationStore(BaseSlavedStore): "get_user_by_access_token" ] - insert_client_ip = DataStore.insert_client_ip.__func__ - _query_for_auth = DataStore._query_for_auth.__func__ |