diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-05-31 13:23:58 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-05-31 13:23:58 +0100 |
commit | 86a746982bbf85f56d33af84172035d13df92c62 (patch) | |
tree | 7495bba24e70b811fc2f0eb5efb28119d1b61a10 /synapse/app | |
parent | Merge remote-tracking branch 'origin/develop' into markjh/synchrotron (diff) | |
download | synapse-86a746982bbf85f56d33af84172035d13df92c62.tar.xz |
Poke the notifier to wake up /syncs
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/synchrotron.py | 21 |
1 files changed, 21 insertions, 0 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 623a73f67a..6a0592f41d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,10 +16,12 @@ import synapse +from synapse.api.constants import EventTypes from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX @@ -49,6 +51,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -226,10 +229,27 @@ class SynchrotronServer(HomeServer): store = self.get_datastore() replication_url = self.config.replication_url clock = self.get_clock() + notifier = self.get_notifier() def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + next_expire_broken_caches_ms = 0 while True: try: @@ -244,6 +264,7 @@ class SynchrotronServer(HomeServer): now_ms + store.BROKEN_CACHE_EXPIRY_MS ) yield store.process_replication(result) + notify(result) except: logger.exception("Error replicating from %r", replication_url) sleep(5) |