diff options
author | Erik Johnston <erik@matrix.org> | 2018-11-13 15:33:54 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-11-13 15:33:54 +0000 |
commit | 822fcc3bb83ae16751bd08cf6d21651f4ac7b312 (patch) | |
tree | c022dff6bfc6465da03d0392b31ead3bed203551 | |
parent | Fix sync for archived rooms (diff) | |
download | synapse-822fcc3bb83ae16751bd08cf6d21651f4ac7b312.tar.xz |
Add concept of internal events
-rw-r--r-- | synapse/events/__init__.py | 3 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 3 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 32 | ||||
-rw-r--r-- | synapse/handlers/message.py | 4 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/sync.py | 15 | ||||
-rw-r--r-- | synapse/storage/events.py | 1 |
6 files changed, 42 insertions, 16 deletions
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 84c75495d5..d70942f654 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -44,6 +44,9 @@ class _EventInternalMetadata(object): def is_invite_from_remote(self): return getattr(self, "invite_from_remote", False) + def is_internal_event(self): + return getattr(self, "internal_event", False) + def get_send_on_behalf_of(self): """Whether this server should send the event on behalf of another server. This is used by the federation "send_join" API to forward the initial join diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 099ace28c1..dd85c0f4e8 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -175,6 +175,9 @@ class TransactionQueue(object): if not is_mine and send_on_behalf_of is None: return + if event.internal_metadata.is_internal_event(): + return + try: # Get the state from before the event. # We need to make sure that this is the state from before diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 24189d85c9..c37a3b8dca 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -55,13 +55,14 @@ from synapse.replication.http.federation import ( ) from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store -from synapse.types import UserID, get_domain_from_id +from synapse.types import UserID, create_requester, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination +from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_server from ._base import BaseHandler @@ -223,9 +224,11 @@ class FederationHandler(BaseHandler): state = None auth_chain = [] + new_thread = False if thread_id is None: # FIXME: Pick something better? thread_id = random.randint(0, 999999999) + new_thread = True # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): @@ -425,6 +428,7 @@ class FederationHandler(BaseHandler): pass else: thread_id = 0 + new_thread = False logger.info("Thread ID %r", thread_id) @@ -436,6 +440,32 @@ class FederationHandler(BaseHandler): thread_id=thread_id, ) + if new_thread: + builder = self.event_builder_factory.new({ + "type": "org.matrix.new_thread", + "content": { + "thread_id": thread_id, + "latest_event": pdu.event_id, + }, + "event_id": random_string(24), + "origin_server_ts": self.clock.time_msec(), + "sender": "@server:server", + "room_id": pdu.room_id, + }) + + event, context = yield self.event_creation_handler.create_new_client_event( + builder=builder, + ) + event.internal_metadata.internal_event = True + yield self.event_creation_handler.handle_new_client_event( + create_requester(UserID("server", "server")), + event, + context, + ratelimit=True, + extra_users=[], + do_auth=False, + ) + @defer.inlineCallbacks def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth, thread_id): """ diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a7cd779b02..54ffaeb087 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -588,6 +588,7 @@ class EventCreationHandler(object): context, ratelimit=True, extra_users=[], + do_auth=True, ): """Processes a new event. This includes checking auth, persisting it, notifying users, sending to remote servers, etc. @@ -604,7 +605,8 @@ class EventCreationHandler(object): """ try: - yield self.auth.check_from_context(event, context) + if do_auth: + yield self.auth.check_from_context(event, context) except AuthError as err: logger.warn("Denying new event %r because %s", event, err) raise err diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 5279bc84be..5a7e85eda7 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -32,7 +32,6 @@ from synapse.handlers.presence import format_user_presence_state from synapse.handlers.sync import SyncConfig from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string from synapse.types import StreamToken -from synapse.util.stringutils import random_string from ._base import client_v2_patterns, set_timeline_upper_limit @@ -387,22 +386,10 @@ class SyncRestServlet(RestServlet): if exclude_threaded: serialized_timeline = [] - seen_threads = set() for e in reversed(timeline_events): thread_id = e.internal_metadata.thread_id if thread_id != 0: - if thread_id not in seen_threads: - serialized_timeline.append({ - "type": "org.matrix.new_thread", - "content": { - "thread_id": thread_id, - "latest_event": e.event_id, - }, - "event_id": random_string(24), - "origin_server_ts": e.origin_server_ts, - "sender": "@server", - }) - seen_threads.add(thread_id) + pass else: serialized_timeline.append(serialize(e)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 855f859115..904f9305ee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -537,6 +537,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_events = [ event for event, ctx in event_contexts if not event.internal_metadata.is_outlier() and not ctx.rejected + and not event.internal_metadata.is_internal_event() ] # start with the existing forward extremities |