1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
import collections
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"device",
"since",
"limit",
"gap",
"sort"
"backfill"
"filter",
)
RoomSyncResult = collections.namedtuple("RoomSyncResult", [
"room_id",
"limited",
"published",
"prev_batch",
"events",
"state",
"event_map",
])
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"private_user_data", # List of private events for the user.
"public_user_data", # List of public events for all users.
"rooms", # RoomSyncResult for each room.
])):
__slots__ = []
def __nonzero__(self):
return self.private_user_data or self.public_user_data or self.rooms
class SyncHandler(BaseHandler):
def __init__(self, hs):
super(SyncHandler, self).__init__(hs)
self.event_sources = hs.get_event_sources()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
if timeout == 0:
return self.current_sync_for_user(sync_config, since)
else:
def current_sync_callback(since_token):
return self.current_sync_for_user(
self, since_token, sync_config
)
return self.notifier.wait_for_events(
sync_config.filter, since_token, current_sync_callback
)
defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None):
if since_token is None:
return self.inital_sync(sync_config)
else:
return self.incremental_sync(sync_config)
@defer.inlineCallbacks
def initial_sync(self, sync_config):
now_token = yield self.event_sources.get_current_token()
presence_stream = self.event_sources.sources["presence"]
# TODO (markjh): This looks wrong, shouldn't we be getting the presence
# UP to the present rather than after the present?
pagination_config = PaginationConfig(from_token=now_token)
presence, _ = yield presence_stream.get_pagination_rows(
user, pagination_config.get_source_config("presence"), None
)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=user_id,
membership_list=[Membership.INVITE, Membership.JOIN]
)
# TODO (markjh): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in public_rooms)
for event in room_list:
messages, token = yield self.store.get_recent_events_for_room(
event.room_id,
limit=sync_config.limit,
end_token=now_token.room_key,
)
prev_batch_token = now_token.copy_and_replace("room_key", token[0])
current_state = yield self.state_handler.get_current_state(
event.room_id
)
rooms.append(RoomSyncResult(
room_id=event.room_id,
published=event.room_id in published_room_ids,
@defer.inlineCallbacks
def incremental_sync(self, sync_config):
|