diff options
-rw-r--r-- | synapse/replication/pusher_resource.py | 53 | ||||
-rw-r--r-- | synapse/replication/resource.py | 7 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 14 | ||||
-rw-r--r-- | tests/replication/slave/storage/test_events.py | 43 |
4 files changed, 114 insertions, 3 deletions
diff --git a/synapse/replication/pusher_resource.py b/synapse/replication/pusher_resource.py new file mode 100644 index 0000000000..b87026d79a --- /dev/null +++ b/synapse/replication/pusher_resource.py @@ -0,0 +1,53 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET +from twisted.internet import defer + + +class PusherResource(Resource): + """ + HTTP endpoint for deleting rejected pushers + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.version_string = hs.version_string + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler + @defer.inlineCallbacks + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + for remove in content["remove"]: + yield self.store.delete_pusher_by_app_id_pushkey_user_id( + remove["app_id"], + remove["push_key"], + remove["user_id"], + ) + + self.notifier.on_new_replication_data() + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a543af68f8..e5c9a53929 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -15,6 +15,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request +from synapse.replication.pusher_resource import PusherResource from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -102,8 +103,6 @@ class ReplicationResource(Resource): long-polling this replication API for new data on those streams. """ - isLeaf = True - def __init__(self, hs): Resource.__init__(self) # Resource is old-style, so no super() @@ -114,6 +113,8 @@ class ReplicationResource(Resource): self.typing_handler = hs.get_handlers().typing_notification_handler self.notifier = hs.notifier + self.putChild("remove_pushers", PusherResource(hs)) + def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET @@ -343,7 +344,7 @@ class ReplicationResource(Resource): "app_id", "app_display_name", "device_display_name", "pushkey", "ts", "lang", "data" )) - writer.write_header_and_rows("deleted", deleted, ( + writer.write_header_and_rows("deleted_pushers", deleted, ( "position", "user_id", "app_id", "pushkey" )) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 82f171c257..5f37ba6995 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -21,6 +21,7 @@ from synapse.storage import DataStore from synapse.storage.room import RoomStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore +from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.state import StateStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -71,7 +72,16 @@ class SlavedEventStore(BaseSlavedStore): get_invited_rooms_for_user = RoomMemberStore.__dict__[ "get_invited_rooms_for_user" ] + get_unread_event_push_actions_by_room_for_user = ( + EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] + ) + get_unread_push_actions_for_user_in_range = ( + DataStore.get_unread_push_actions_for_user_in_range.__func__ + ) + get_push_action_users_in_range = ( + DataStore.get_push_action_users_in_range.__func__ + ) get_event = DataStore.get_event.__func__ get_current_state = DataStore.get_current_state.__func__ get_current_state_for_key = DataStore.get_current_state_for_key.__func__ @@ -167,6 +177,10 @@ class SlavedEventStore(BaseSlavedStore): self.get_latest_event_ids_in_room.invalidate((event.room_id,)) + self.get_unread_event_push_actions_by_room_for_user.invalidate_many( + (event.room_id,) + ) + if not backfilled: self._events_stream_cache.entity_has_changed( event.room_id, event.internal_metadata.stream_ordering diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 41a626cf70..17587fda00 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -266,6 +266,47 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): event.internal_metadata.stream_ordering )]) + @defer.inlineCallbacks + def test_push_actions_for_user(self): + yield self.persist(type="m.room.create", creator=USER_ID) + yield self.persist(type="m.room.join", key=USER_ID, membership="join") + yield self.persist( + type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join" + ) + event1 = yield self.persist( + type="m.room.message", msgtype="m.text", body="hello" + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 0} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, ["notify"])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 0, "notify_count": 1} + ) + + yield self.persist( + type="m.room.message", msgtype="m.text", body="world", + push_actions=[(USER_ID_2, [ + "notify", {"set_tweak": "highlight", "value": True} + ])], + ) + yield self.replicate() + yield self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 1, "notify_count": 2} + ) + event_id = 0 @defer.inlineCallbacks @@ -273,6 +314,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self, sender=USER_ID, room_id=ROOM_ID, type={}, key=None, internal={}, state=None, reset_state=False, backfill=False, depth=None, prev_events=[], auth_events=[], prev_state=[], redacts=None, + push_actions=[], **content ): """ @@ -305,6 +347,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.event_id += 1 context = EventContext(current_state=state) + context.push_actions = push_actions ordering = None if backfill: |