From 6284acf910810ebf29dda1fda94f89549e8e89d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Nov 2018 16:20:40 +0000 Subject: Add API to force new threads --- synapse/federation/federation_server.py | 3 +-- synapse/handlers/federation.py | 14 +++++++++++-- synapse/rest/client/v1/admin.py | 22 +++++++++++++++++++++ .../storage/schema/delta/52/add_threa_index.sql | 16 --------------- synapse/storage/schema/delta/52/thread_id.sql | 20 ------------------- synapse/storage/schema/delta/52/thread_id2.sql | 23 ++++++++++++++++++++++ 6 files changed, 58 insertions(+), 40 deletions(-) delete mode 100644 synapse/storage/schema/delta/52/add_threa_index.sql delete mode 100644 synapse/storage/schema/delta/52/thread_id.sql create mode 100644 synapse/storage/schema/delta/52/thread_id2.sql diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index eb337007f1..efb1360edb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -213,8 +213,7 @@ class FederationServer(FederationBase): pdu_to_thread = {} first_in_thread = True for pdu in reversed(pdus_by_room[room_id]): - now = self.clock.time_msec() - if now - pdu.origin_server_ts > 1 * 60 * 1000: + if self.handler.should_start_thread(pdu): pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) first_in_thread = False else: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 651d640bf9..5053038d02 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -137,6 +137,10 @@ class FederationHandler(BaseHandler): self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") + # Always start a new thread for events that have an origin_server_ts + # from before this + self.force_thread_ts = 0 + @defer.inlineCallbacks def on_receive_pdu( self, origin, pdu, sent_to_us_directly=False, thread_id=None, @@ -567,8 +571,7 @@ class FederationHandler(BaseHandler): thread_id = random.randint(1, 999999999) first_in_thread = True for pdu in reversed(missing_events): - now = self.clock.time_msec() - if now - pdu.origin_server_ts > 1 * 60 * 1000: + if self.should_start_thread(pdu): pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) first_in_thread = False else: @@ -2641,3 +2644,10 @@ class FederationHandler(BaseHandler): ) else: return user_joined_room(self.distributor, user, room_id) + + def should_start_thread(self, event): + now = self.clock.time_msec() + forced = event.origin_server_ts <= self.force_thread_ts + old = now - event.origin_server_ts > 1 * 60 * 1000 + + return forced or old diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 50bfd0bb9e..1913230799 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -828,6 +828,27 @@ class ServerHealth(ClientV1RestServlet): defer.returnValue((200, {})) +class ForceThreadServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/admin/force_thread") + + def __init__(self, hs): + super(ForceThreadServlet, self).__init__(hs) + self.federation_handler = hs.get_handlers().federation_handler + self.clock = hs.get_clock() + + def on_GET(self, request): + return self.do_force_thread() + + def on_POST(self, request): + return self.do_force_thread() + + @defer.inlineCallbacks + def do_force_thread(self): + yield self.clock.sleep(0) + self.federation_handler.force_thread_ts = self.clock.time_msec() + defer.returnValue((200, {})) + + def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) PurgeMediaCacheRestServlet(hs).register(http_server) @@ -843,3 +864,4 @@ def register_servlets(hs, http_server): ListMediaInRoom(hs).register(http_server) UserRegisterServlet(hs).register(http_server) ServerHealth(hs).register(http_server) + ForceThreadServlet(hs).register(http_server) diff --git a/synapse/storage/schema/delta/52/add_threa_index.sql b/synapse/storage/schema/delta/52/add_threa_index.sql deleted file mode 100644 index 16c4b26e69..0000000000 --- a/synapse/storage/schema/delta/52/add_threa_index.sql +++ /dev/null @@ -1,16 +0,0 @@ -/* Copyright 2018 New Vector Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE INDEX event_room_thread_ts ON events (room_id, thread_id, origin_server_ts); diff --git a/synapse/storage/schema/delta/52/thread_id.sql b/synapse/storage/schema/delta/52/thread_id.sql deleted file mode 100644 index 845529fdcd..0000000000 --- a/synapse/storage/schema/delta/52/thread_id.sql +++ /dev/null @@ -1,20 +0,0 @@ -/* Copyright 2018 New Vector Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -ALTER TABLE events ADD COLUMN thread_id BIGINT NOT NULL DEFAULT 0; - -CREATE INDEX events_room_idx ON events (room_id, thread_id); - --- CREATE SEQUENCE thread_id_seq; diff --git a/synapse/storage/schema/delta/52/thread_id2.sql b/synapse/storage/schema/delta/52/thread_id2.sql new file mode 100644 index 0000000000..aa03b6e49d --- /dev/null +++ b/synapse/storage/schema/delta/52/thread_id2.sql @@ -0,0 +1,23 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN IF NOT EXISTS thread_id BIGINT NOT NULL DEFAULT 0; + +CREATE INDEX IF NOT EXISTS events_room_idx ON events (room_id, thread_id); + +-- CREATE SEQUENCE thread_id_seq; + + +CREATE INDEX IF NOT EXISTS event_room_thread_ts ON events (room_id, thread_id, origin_server_ts); -- cgit 1.5.1