summary refs log tree commit diff
path: root/synapse/handlers/sync.py
blob: ec20ea4890c5d6213b6d8e30ab45bc0ceb764607 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import collections


SyncConfig = collections.namedtuple("SyncConfig", [
    "user",
    "device",
    "since",
    "limit",
    "gap",
    "sort"
    "backfill"
    "filter",
)


RoomSyncResult = collections.namedtuple("RoomSyncResult", [
    "room_id",
    "limited",
    "published",
    "prev_batch",
    "events",
    "state",
    "event_map",
])


class SyncResult(collections.namedtuple("SyncResult", [
    "next_batch", # Token for the next sync
    "private_user_data", # List of private events for the user.
    "public_user_data", # List of public events for all users.
    "rooms", # RoomSyncResult for each room.
])):
    __slots__ = []

    def __nonzero__(self):
        return self.private_user_data or self.public_user_data or self.rooms


class SyncHandler(BaseHandler):

    def __init__(self, hs):
        super(SyncHandler, self).__init__(hs)
        self.event_sources = hs.get_event_sources()

    def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
        if timeout == 0:
            return self.current_sync_for_user(sync_config, since)
        else:
            def current_sync_callback(since_token):
                return self.current_sync_for_user(
                    self, since_token, sync_config
                )
            return self.notifier.wait_for_events(
                sync_config.filter, since_token, current_sync_callback
            )
        defer.returnValue(result)

    def current_sync_for_user(self, sync_config, since_token=None):
        if since_token is None:
            return self.inital_sync(sync_config)
        else:
            return self.incremental_sync(sync_config)

    @defer.inlineCallbacks
    def initial_sync(self, sync_config):
        now_token = yield self.event_sources.get_current_token()

        presence_stream = self.event_sources.sources["presence"]
        # TODO (markjh): This looks wrong, shouldn't we be getting the presence
        # UP to the present rather than after the present?
        pagination_config = PaginationConfig(from_token=now_token)
        presence, _ = yield presence_stream.get_pagination_rows(
            user, pagination_config.get_source_config("presence"), None
        )
        room_list = yield self.store.get_rooms_for_user_where_membership_is(
            user_id=user_id,
            membership_list=[Membership.INVITE, Membership.JOIN]
        )

        # TODO (markjh): Does public mean "published"?
        published_rooms = yield self.store.get_rooms(is_public=True)
        published_room_ids = set(r["room_id"] for r in public_rooms)

        for event in room_list:

            messages, token = yield self.store.get_recent_events_for_room(
                event.room_id,
                limit=sync_config.limit,
                end_token=now_token.room_key,
            )
            prev_batch_token = now_token.copy_and_replace("room_key", token[0])
            current_state = yield self.state_handler.get_current_state(
                event.room_id
            )

            rooms.append(RoomSyncResult(
                room_id=event.room_id,
                published=event.room_id in published_room_ids,





    @defer.inlineCallbacks
    def incremental_sync(self, sync_config):