summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-26 16:20:40 +0000
committerBrendan Abolivier <babolivier@matrix.org>2019-02-13 15:16:03 +0000
commit6284acf910810ebf29dda1fda94f89549e8e89d1 (patch)
treee12296a8edce8a61135cb9edc51d90148fea7d2d
parentAdd timestamp lookup API (diff)
downloadsynapse-6284acf910810ebf29dda1fda94f89549e8e89d1.tar.xz
Add API to force new threads
-rw-r--r--synapse/federation/federation_server.py3
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/rest/client/v1/admin.py22
-rw-r--r--synapse/storage/schema/delta/52/add_threa_index.sql16
-rw-r--r--synapse/storage/schema/delta/52/thread_id2.sql (renamed from synapse/storage/schema/delta/52/thread_id.sql)7
5 files changed, 40 insertions, 22 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index eb337007f1..efb1360edb 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -213,8 +213,7 @@ class FederationServer(FederationBase):
             pdu_to_thread = {}
             first_in_thread = True
             for pdu in reversed(pdus_by_room[room_id]):
-                now = self.clock.time_msec()
-                if now - pdu.origin_server_ts > 1 * 60 * 1000:
+                if self.handler.should_start_thread(pdu):
                     pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
                     first_in_thread = False
                 else:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 651d640bf9..5053038d02 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -137,6 +137,10 @@ class FederationHandler(BaseHandler):
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
+        # Always start a new thread for events that have an origin_server_ts
+        # from before this
+        self.force_thread_ts = 0
+
     @defer.inlineCallbacks
     def on_receive_pdu(
             self, origin, pdu, sent_to_us_directly=False, thread_id=None,
@@ -567,8 +571,7 @@ class FederationHandler(BaseHandler):
             thread_id = random.randint(1, 999999999)
             first_in_thread = True
             for pdu in reversed(missing_events):
-                now = self.clock.time_msec()
-                if now - pdu.origin_server_ts > 1 * 60 * 1000:
+                if self.should_start_thread(pdu):
                     pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
                     first_in_thread = False
                 else:
@@ -2641,3 +2644,10 @@ class FederationHandler(BaseHandler):
             )
         else:
             return user_joined_room(self.distributor, user, room_id)
+
+    def should_start_thread(self, event):
+        now = self.clock.time_msec()
+        forced = event.origin_server_ts <= self.force_thread_ts
+        old = now - event.origin_server_ts > 1 * 60 * 1000
+
+        return forced or old
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 50bfd0bb9e..1913230799 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -828,6 +828,27 @@ class ServerHealth(ClientV1RestServlet):
         defer.returnValue((200, {}))
 
 
+class ForceThreadServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/admin/force_thread")
+
+    def __init__(self, hs):
+        super(ForceThreadServlet, self).__init__(hs)
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.clock = hs.get_clock()
+
+    def on_GET(self, request):
+        return self.do_force_thread()
+
+    def on_POST(self, request):
+        return self.do_force_thread()
+
+    @defer.inlineCallbacks
+    def do_force_thread(self):
+        yield self.clock.sleep(0)
+        self.federation_handler.force_thread_ts = self.clock.time_msec()
+        defer.returnValue((200, {}))
+
+
 def register_servlets(hs, http_server):
     WhoisRestServlet(hs).register(http_server)
     PurgeMediaCacheRestServlet(hs).register(http_server)
@@ -843,3 +864,4 @@ def register_servlets(hs, http_server):
     ListMediaInRoom(hs).register(http_server)
     UserRegisterServlet(hs).register(http_server)
     ServerHealth(hs).register(http_server)
+    ForceThreadServlet(hs).register(http_server)
diff --git a/synapse/storage/schema/delta/52/add_threa_index.sql b/synapse/storage/schema/delta/52/add_threa_index.sql
deleted file mode 100644
index 16c4b26e69..0000000000
--- a/synapse/storage/schema/delta/52/add_threa_index.sql
+++ /dev/null
@@ -1,16 +0,0 @@
-/* Copyright 2018 New Vector Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-CREATE INDEX event_room_thread_ts ON events (room_id, thread_id, origin_server_ts);
diff --git a/synapse/storage/schema/delta/52/thread_id.sql b/synapse/storage/schema/delta/52/thread_id2.sql
index 845529fdcd..aa03b6e49d 100644
--- a/synapse/storage/schema/delta/52/thread_id.sql
+++ b/synapse/storage/schema/delta/52/thread_id2.sql
@@ -13,8 +13,11 @@
  * limitations under the License.
  */
 
-ALTER TABLE events ADD COLUMN thread_id BIGINT NOT NULL DEFAULT 0;
+ALTER TABLE events ADD COLUMN IF NOT EXISTS thread_id BIGINT NOT NULL DEFAULT 0;
 
-CREATE INDEX events_room_idx ON events (room_id, thread_id);
+CREATE INDEX IF NOT EXISTS events_room_idx ON events (room_id, thread_id);
 
 -- CREATE SEQUENCE thread_id_seq;
+
+
+CREATE INDEX IF NOT EXISTS event_room_thread_ts ON events (room_id, thread_id, origin_server_ts);