summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-21 11:45:11 +0000
committerErik Johnston <erik@matrix.org>2018-11-21 11:45:11 +0000
commite6896040c71f56aa59d8f8a2d1e788d0ad79b4ac (patch)
tree135977cadd92796e8007cb72123ca264082a6533
parentAdd hooks in federation for funky event routing (diff)
parentFix threading when pulling in via get_missing_events (diff)
downloadsynapse-e6896040c71f56aa59d8f8a2d1e788d0ad79b4ac.tar.xz
Merge branch 'erikj/thread_demo' of github.com:matrix-org/synapse into erikj/add_routing_hooks
-rw-r--r--synapse/federation/federation_server.py21
-rw-r--r--synapse/federation/transport/client.py3
-rw-r--r--synapse/handlers/federation.py34
-rw-r--r--synapse/http/matrixfederationclient.py19
-rw-r--r--synapse/storage/schema/delta/52/thread_id.sql20
5 files changed, 70 insertions, 27 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 93c38845f6..e62bdf5bbe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import random
 
 import six
 from six import iteritems
@@ -70,6 +71,7 @@ class FederationServer(FederationBase):
 
         self.auth = hs.get_auth()
         self.handler = hs.get_handlers().federation_handler
+        self.clock = hs.get_clock()
 
         self._server_linearizer = Linearizer("fed_server")
         self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -207,12 +209,26 @@ class FederationServer(FederationBase):
                     pdu_results[event_id] = e.error_dict()
                 return
 
+            thread_id = random.randint(1, 999999999)
+            pdu_to_thread = {}
+            first_in_thread = True
+            for pdu in reversed(pdus_by_room[room_id]):
+                now = self.clock.time_msec()
+                if now - pdu.origin_server_ts > 1 * 60 * 1000:
+                    pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
+                    first_in_thread = False
+                else:
+                    pdu_to_thread[pdu.event_id] = (0, False)
+
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
                 with nested_logging_context(event_id):
+                    thread_id, new_thread = pdu_to_thread[pdu.event_id]
+                    logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
                     try:
                         yield self._handle_received_pdu(
-                            origin, pdu
+                            origin, pdu, thread_id=thread_id,
+                            new_thread=new_thread
                         )
                         pdu_results[event_id] = {}
                     except FederationError as e:
@@ -570,7 +586,7 @@ class FederationServer(FederationBase):
         )
 
     @defer.inlineCallbacks
-    def _handle_received_pdu(self, origin, pdu):
+    def _handle_received_pdu(self, origin, pdu, thread_id, new_thread):
         """ Process a PDU received in a federation /send/ transaction.
 
         If the event is invalid, then this method throws a FederationError.
@@ -613,6 +629,7 @@ class FederationServer(FederationBase):
 
         yield self.handler.on_receive_pdu(
             origin, pdu, sent_to_us_directly=True,
+            thread_id=thread_id, new_thread=new_thread,
         )
 
     def __str__(self):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index edba5a9808..42ed61470f 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -174,7 +174,8 @@ class TransportLayerClient(object):
             path=path,
             data=json_data,
             json_data_callback=json_data_callback,
-            long_retries=True,
+            long_retries=False,
+            timeout=10000,
             backoff_on_404=True,  # If we get a 404 the other side has gone
         )
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c0bea7a5ed..bbca24f23c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -140,6 +140,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def on_receive_pdu(
             self, origin, pdu, sent_to_us_directly=False, thread_id=None,
+            new_thread=False,
     ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
@@ -235,12 +236,6 @@ class FederationHandler(BaseHandler):
         state = None
         auth_chain = []
 
-        new_thread = False
-        if thread_id is None:
-            # FIXME: Pick something better?
-            thread_id = random.randint(0, 999999999)
-            new_thread = True
-
         # Get missing pdus if necessary.
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
@@ -434,13 +429,6 @@ class FederationHandler(BaseHandler):
                         affected=event_id,
                     )
 
-        now = self.clock.time_msec()
-        if now - pdu.origin_server_ts > 2 * 60 * 1000:
-            pass
-        else:
-            thread_id = 0
-            new_thread = False
-
         logger.info("Thread ID %r", thread_id)
 
         yield self._process_received_pdu(
@@ -472,7 +460,7 @@ class FederationHandler(BaseHandler):
                 create_requester(UserID("server", "server")),
                 event,
                 context,
-                ratelimit=True,
+                ratelimit=False,
                 extra_users=[],
                 do_auth=False,
             )
@@ -574,6 +562,21 @@ class FederationHandler(BaseHandler):
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
+        pdu_to_thread = {}
+        if not thread_id:
+            thread_id = random.randint(1, 999999999)
+            first_in_thread = True
+            for pdu in reversed(missing_events):
+                now = self.clock.time_msec()
+                if now - pdu.origin_server_ts > 1 * 60 * 1000:
+                    pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
+                    first_in_thread = False
+                else:
+                    pdu_to_thread[pdu.event_id] = (0, False)
+        else:
+            for pdu in reversed(missing_events):
+                pdu_to_thread[pdu.event_id] = (thread_id, False)
+
         for ev in missing_events:
             logger.info(
                 "[%s %s] Handling received prev_event %s",
@@ -585,7 +588,8 @@ class FederationHandler(BaseHandler):
                         origin,
                         ev,
                         sent_to_us_directly=False,
-                        thread_id=thread_id,
+                        thread_id=pdu_to_thread[ev.event_id][0],
+                        new_thread=pdu_to_thread[ev.event_id][1],
                     )
                 except FederationError as e:
                     if e.code == 403:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 24b6110c20..d2ca39c71e 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -196,7 +196,7 @@ class MatrixFederationHttpClient(object):
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
         self.version_string_bytes = hs.version_string.encode('ascii')
-        self.default_timeout = 60
+        self.default_timeout = 30
 
         def schedule(x):
             reactor.callLater(_EPSILON, x)
@@ -253,13 +253,13 @@ class MatrixFederationHttpClient(object):
         ):
             raise FederationDeniedError(request.destination)
 
-        limiter = yield synapse.util.retryutils.get_retry_limiter(
-            request.destination,
-            self.clock,
-            self._store,
-            backoff_on_404=backoff_on_404,
-            ignore_backoff=ignore_backoff,
-        )
+        # limiter = yield synapse.util.retryutils.get_retry_limiter(
+        #     request.destination,
+        #     self.clock,
+        #     self._store,
+        #     backoff_on_404=backoff_on_404,
+        #     ignore_backoff=ignore_backoff,
+        # )
 
         method_bytes = request.method.encode("ascii")
         destination_bytes = request.destination.encode("ascii")
@@ -274,7 +274,8 @@ class MatrixFederationHttpClient(object):
             b"Host": [destination_bytes],
         }
 
-        with limiter:
+        # with limiter:
+        if True:
             # XXX: Would be much nicer to retry only at the transaction-layer
             # (once we have reliable transactions in place)
             if long_retries:
diff --git a/synapse/storage/schema/delta/52/thread_id.sql b/synapse/storage/schema/delta/52/thread_id.sql
new file mode 100644
index 0000000000..845529fdcd
--- /dev/null
+++ b/synapse/storage/schema/delta/52/thread_id.sql
@@ -0,0 +1,20 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN thread_id BIGINT NOT NULL DEFAULT 0;
+
+CREATE INDEX events_room_idx ON events (room_id, thread_id);
+
+-- CREATE SEQUENCE thread_id_seq;