summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-21 10:45:35 +0000
committerErik Johnston <erik@matrix.org>2018-11-21 10:45:35 +0000
commitd0d3c63705d72c051cb3e03ec7f717d5a6051179 (patch)
treed060caa56856637008a9d43b29cce3fc8c17add5
parentSend down new thread marker (diff)
downloadsynapse-d0d3c63705d72c051cb3e03ec7f717d5a6051179.tar.xz
Fix threading when pulling in via get_missing_events
-rw-r--r--synapse/federation/federation_server.py3
-rw-r--r--synapse/handlers/federation.py18
2 files changed, 19 insertions, 2 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9e064b2e57..605597acab 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -210,7 +210,7 @@ class FederationServer(FederationBase):
                     pdu_results[event_id] = e.error_dict()
                 return
 
-            thread_id = random.randint(0, 999999999)
+            thread_id = random.randint(1, 999999999)
             pdu_to_thread = {}
             first_in_thread = True
             for pdu in reversed(pdus_by_room[room_id]):
@@ -225,6 +225,7 @@ class FederationServer(FederationBase):
                 event_id = pdu.event_id
                 with nested_logging_context(event_id):
                     thread_id, new_thread = pdu_to_thread[pdu.event_id]
+                    logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
                     try:
                         yield self._handle_received_pdu(
                             origin, pdu, thread_id=thread_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 68e13673fd..9e470f8614 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -551,6 +551,21 @@ class FederationHandler(BaseHandler):
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
+        pdu_to_thread = {}
+        if not thread_id:
+            thread_id = random.randint(1, 999999999)
+            first_in_thread = True
+            for pdu in reversed(missing_events):
+                now = self.clock.time_msec()
+                if now - pdu.origin_server_ts > 1 * 60 * 1000:
+                    pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
+                    first_in_thread = False
+                else:
+                    pdu_to_thread[pdu.event_id] = (0, False)
+        else:
+            for pdu in reversed(missing_events):
+                pdu_to_thread[pdu.event_id] = (thread_id, False)
+
         for ev in missing_events:
             logger.info(
                 "[%s %s] Handling received prev_event %s",
@@ -562,7 +577,8 @@ class FederationHandler(BaseHandler):
                         origin,
                         ev,
                         sent_to_us_directly=False,
-                        thread_id=thread_id,
+                        thread_id=pdu_to_thread[ev.event_id][0],
+                        new_thread=pdu_to_thread[ev.event_id][1],
                     )
                 except FederationError as e:
                     if e.code == 403: