diff options
author | Erik Johnston <erik@matrix.org> | 2018-11-26 16:20:40 +0000 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-02-13 15:16:03 +0000 |
commit | 6284acf910810ebf29dda1fda94f89549e8e89d1 (patch) | |
tree | e12296a8edce8a61135cb9edc51d90148fea7d2d | |
parent | Add timestamp lookup API (diff) | |
download | synapse-6284acf910810ebf29dda1fda94f89549e8e89d1.tar.xz |
Add API to force new threads
-rw-r--r-- | synapse/federation/federation_server.py | 3 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 14 | ||||
-rw-r--r-- | synapse/rest/client/v1/admin.py | 22 | ||||
-rw-r--r-- | synapse/storage/schema/delta/52/add_threa_index.sql | 16 | ||||
-rw-r--r-- | synapse/storage/schema/delta/52/thread_id2.sql (renamed from synapse/storage/schema/delta/52/thread_id.sql) | 7 |
5 files changed, 40 insertions, 22 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index eb337007f1..efb1360edb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -213,8 +213,7 @@ class FederationServer(FederationBase): pdu_to_thread = {} first_in_thread = True for pdu in reversed(pdus_by_room[room_id]): - now = self.clock.time_msec() - if now - pdu.origin_server_ts > 1 * 60 * 1000: + if self.handler.should_start_thread(pdu): pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) first_in_thread = False else: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 651d640bf9..5053038d02 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -137,6 +137,10 @@ class FederationHandler(BaseHandler): self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") + # Always start a new thread for events that have an origin_server_ts + # from before this + self.force_thread_ts = 0 + @defer.inlineCallbacks def on_receive_pdu( self, origin, pdu, sent_to_us_directly=False, thread_id=None, @@ -567,8 +571,7 @@ class FederationHandler(BaseHandler): thread_id = random.randint(1, 999999999) first_in_thread = True for pdu in reversed(missing_events): - now = self.clock.time_msec() - if now - pdu.origin_server_ts > 1 * 60 * 1000: + if self.should_start_thread(pdu): pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) first_in_thread = False else: @@ -2641,3 +2644,10 @@ class FederationHandler(BaseHandler): ) else: return user_joined_room(self.distributor, user, room_id) + + def should_start_thread(self, event): + now = self.clock.time_msec() + forced = event.origin_server_ts <= self.force_thread_ts + old = now - event.origin_server_ts > 1 * 60 * 1000 + + return forced or old diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 50bfd0bb9e..1913230799 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -828,6 +828,27 @@ class ServerHealth(ClientV1RestServlet): defer.returnValue((200, {})) +class ForceThreadServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/admin/force_thread") + + def __init__(self, hs): + super(ForceThreadServlet, self).__init__(hs) + self.federation_handler = hs.get_handlers().federation_handler + self.clock = hs.get_clock() + + def on_GET(self, request): + return self.do_force_thread() + + def on_POST(self, request): + return self.do_force_thread() + + @defer.inlineCallbacks + def do_force_thread(self): + yield self.clock.sleep(0) + self.federation_handler.force_thread_ts = self.clock.time_msec() + defer.returnValue((200, {})) + + def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) PurgeMediaCacheRestServlet(hs).register(http_server) @@ -843,3 +864,4 @@ def register_servlets(hs, http_server): ListMediaInRoom(hs).register(http_server) UserRegisterServlet(hs).register(http_server) ServerHealth(hs).register(http_server) + ForceThreadServlet(hs).register(http_server) diff --git a/synapse/storage/schema/delta/52/add_threa_index.sql b/synapse/storage/schema/delta/52/add_threa_index.sql deleted file mode 100644 index 16c4b26e69..0000000000 --- a/synapse/storage/schema/delta/52/add_threa_index.sql +++ /dev/null @@ -1,16 +0,0 @@ -/* Copyright 2018 New Vector Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE INDEX event_room_thread_ts ON events (room_id, thread_id, origin_server_ts); diff --git a/synapse/storage/schema/delta/52/thread_id.sql b/synapse/storage/schema/delta/52/thread_id2.sql index 845529fdcd..aa03b6e49d 100644 --- a/synapse/storage/schema/delta/52/thread_id.sql +++ b/synapse/storage/schema/delta/52/thread_id2.sql @@ -13,8 +13,11 @@ * limitations under the License. */ -ALTER TABLE events ADD COLUMN thread_id BIGINT NOT NULL DEFAULT 0; +ALTER TABLE events ADD COLUMN IF NOT EXISTS thread_id BIGINT NOT NULL DEFAULT 0; -CREATE INDEX events_room_idx ON events (room_id, thread_id); +CREATE INDEX IF NOT EXISTS events_room_idx ON events (room_id, thread_id); -- CREATE SEQUENCE thread_id_seq; + + +CREATE INDEX IF NOT EXISTS event_room_thread_ts ON events (room_id, thread_id, origin_server_ts); |