diff options
author | Erik Johnston <erik@matrix.org> | 2018-11-21 11:45:11 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-11-21 11:45:11 +0000 |
commit | e6896040c71f56aa59d8f8a2d1e788d0ad79b4ac (patch) | |
tree | 135977cadd92796e8007cb72123ca264082a6533 | |
parent | Add hooks in federation for funky event routing (diff) | |
parent | Fix threading when pulling in via get_missing_events (diff) | |
download | synapse-e6896040c71f56aa59d8f8a2d1e788d0ad79b4ac.tar.xz |
Merge branch 'erikj/thread_demo' of github.com:matrix-org/synapse into erikj/add_routing_hooks
-rw-r--r-- | synapse/federation/federation_server.py | 21 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 3 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 34 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 19 | ||||
-rw-r--r-- | synapse/storage/schema/delta/52/thread_id.sql | 20 |
5 files changed, 70 insertions, 27 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 93c38845f6..e62bdf5bbe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import six from six import iteritems @@ -70,6 +71,7 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self.handler = hs.get_handlers().federation_handler + self.clock = hs.get_clock() self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -207,12 +209,26 @@ class FederationServer(FederationBase): pdu_results[event_id] = e.error_dict() return + thread_id = random.randint(1, 999999999) + pdu_to_thread = {} + first_in_thread = True + for pdu in reversed(pdus_by_room[room_id]): + now = self.clock.time_msec() + if now - pdu.origin_server_ts > 1 * 60 * 1000: + pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) + first_in_thread = False + else: + pdu_to_thread[pdu.event_id] = (0, False) + for pdu in pdus_by_room[room_id]: event_id = pdu.event_id with nested_logging_context(event_id): + thread_id, new_thread = pdu_to_thread[pdu.event_id] + logger.info("Assigning thread %d to %s", thread_id, pdu.event_id) try: yield self._handle_received_pdu( - origin, pdu + origin, pdu, thread_id=thread_id, + new_thread=new_thread ) pdu_results[event_id] = {} except FederationError as e: @@ -570,7 +586,7 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - def _handle_received_pdu(self, origin, pdu): + def _handle_received_pdu(self, origin, pdu, thread_id, new_thread): """ Process a PDU received in a federation /send/ transaction. If the event is invalid, then this method throws a FederationError. @@ -613,6 +629,7 @@ class FederationServer(FederationBase): yield self.handler.on_receive_pdu( origin, pdu, sent_to_us_directly=True, + thread_id=thread_id, new_thread=new_thread, ) def __str__(self): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index edba5a9808..42ed61470f 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -174,7 +174,8 @@ class TransportLayerClient(object): path=path, data=json_data, json_data_callback=json_data_callback, - long_retries=True, + long_retries=False, + timeout=10000, backoff_on_404=True, # If we get a 404 the other side has gone ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c0bea7a5ed..bbca24f23c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -140,6 +140,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_receive_pdu( self, origin, pdu, sent_to_us_directly=False, thread_id=None, + new_thread=False, ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -235,12 +236,6 @@ class FederationHandler(BaseHandler): state = None auth_chain = [] - new_thread = False - if thread_id is None: - # FIXME: Pick something better? - thread_id = random.randint(0, 999999999) - new_thread = True - # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -434,13 +429,6 @@ class FederationHandler(BaseHandler): affected=event_id, ) - now = self.clock.time_msec() - if now - pdu.origin_server_ts > 2 * 60 * 1000: - pass - else: - thread_id = 0 - new_thread = False - logger.info("Thread ID %r", thread_id) yield self._process_received_pdu( @@ -472,7 +460,7 @@ class FederationHandler(BaseHandler): create_requester(UserID("server", "server")), event, context, - ratelimit=True, + ratelimit=False, extra_users=[], do_auth=False, ) @@ -574,6 +562,21 @@ class FederationHandler(BaseHandler): # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) + pdu_to_thread = {} + if not thread_id: + thread_id = random.randint(1, 999999999) + first_in_thread = True + for pdu in reversed(missing_events): + now = self.clock.time_msec() + if now - pdu.origin_server_ts > 1 * 60 * 1000: + pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) + first_in_thread = False + else: + pdu_to_thread[pdu.event_id] = (0, False) + else: + for pdu in reversed(missing_events): + pdu_to_thread[pdu.event_id] = (thread_id, False) + for ev in missing_events: logger.info( "[%s %s] Handling received prev_event %s", @@ -585,7 +588,8 @@ class FederationHandler(BaseHandler): origin, ev, sent_to_us_directly=False, - thread_id=thread_id, + thread_id=pdu_to_thread[ev.event_id][0], + new_thread=pdu_to_thread[ev.event_id][1], ) except FederationError as e: if e.code == 403: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 24b6110c20..d2ca39c71e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -196,7 +196,7 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self._store = hs.get_datastore() self.version_string_bytes = hs.version_string.encode('ascii') - self.default_timeout = 60 + self.default_timeout = 30 def schedule(x): reactor.callLater(_EPSILON, x) @@ -253,13 +253,13 @@ class MatrixFederationHttpClient(object): ): raise FederationDeniedError(request.destination) - limiter = yield synapse.util.retryutils.get_retry_limiter( - request.destination, - self.clock, - self._store, - backoff_on_404=backoff_on_404, - ignore_backoff=ignore_backoff, - ) + # limiter = yield synapse.util.retryutils.get_retry_limiter( + # request.destination, + # self.clock, + # self._store, + # backoff_on_404=backoff_on_404, + # ignore_backoff=ignore_backoff, + # ) method_bytes = request.method.encode("ascii") destination_bytes = request.destination.encode("ascii") @@ -274,7 +274,8 @@ class MatrixFederationHttpClient(object): b"Host": [destination_bytes], } - with limiter: + # with limiter: + if True: # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: diff --git a/synapse/storage/schema/delta/52/thread_id.sql b/synapse/storage/schema/delta/52/thread_id.sql new file mode 100644 index 0000000000..845529fdcd --- /dev/null +++ b/synapse/storage/schema/delta/52/thread_id.sql @@ -0,0 +1,20 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN thread_id BIGINT NOT NULL DEFAULT 0; + +CREATE INDEX events_room_idx ON events (room_id, thread_id); + +-- CREATE SEQUENCE thread_id_seq; |