hat.event.backends.lmdb.manager.query

  1from pathlib import Path
  2import argparse
  3
  4from hat import json
  5
  6from hat.event.backends.lmdb.manager import common
  7
  8
  9def create_argument_parser(subparsers) -> argparse.ArgumentParser:
 10    parser = subparsers.add_parser('query')
 11    parser.add_argument('--db', metavar='PATH', type=Path, required=True)
 12    subsubparsers = parser.add_subparsers(dest='subaction', required=True)
 13
 14    subsubparsers.add_parser('settings')
 15
 16    last_event_id_parser = subsubparsers.add_parser('last_event_id')
 17    last_event_id_parser.add_argument('--server-id', type=int, default=None)
 18
 19    last_timestamps_parser = subsubparsers.add_parser('last_timestamp')
 20    last_timestamps_parser.add_argument('--server-id', type=int, default=None)
 21
 22    ref_parser = subsubparsers.add_parser('ref')
 23    ref_parser.add_argument('--server-id', type=int, default=None)
 24
 25    latest_parser = subsubparsers.add_parser('latest')
 26    latest_parser.add_argument('--event-types', type=str, default=None,
 27                               nargs='*')
 28
 29    partition_parser = subsubparsers.add_parser('partition')
 30    partition_parser.add_argument('--partition-id', type=int, default=None)
 31
 32    timeseries_parser = subsubparsers.add_parser('timeseries')
 33    timeseries_parser.add_argument('--partition-id', type=int, default=None)
 34    timeseries_parser.add_argument('--server-id', type=int, default=None)
 35    timeseries_parser.add_argument('--event-types', type=str, default=None,
 36                                   nargs='*')
 37
 38    return parser
 39
 40
 41def query(args):
 42    with common.ext_create_env(args.db, readonly=True) as env:
 43        dbs = {db_type: common.ext_open_db(env, db_type, False)
 44               for db_type in common.DbType}
 45
 46        with env.begin(buffers=True) as txn:
 47            if args.subaction == 'settings':
 48                for result in _query_settings(dbs, txn):
 49                    _print_result(result)
 50
 51            elif args.subaction == 'last_event_id':
 52                for result in _query_last_event_id(dbs, txn, args.server_id):
 53                    _print_result(result)
 54
 55            elif args.subaction == 'last_timestamp':
 56                for result in _query_last_timestamp(dbs, txn, args.server_id):
 57                    _print_result(result)
 58
 59            elif args.subaction == 'ref':
 60                for result in _query_ref(dbs, txn):
 61                    _print_result(result)
 62
 63            elif args.subaction == 'latest':
 64                event_types = ([tuple(i.split('/')) for i in args.event_types]
 65                               if args.event_types is not None else None)
 66                for result in _query_ref(dbs, txn, event_types):
 67                    _print_result(result)
 68
 69            elif args.subaction == 'partition':
 70                for result in _query_partition(dbs, txn, args.partition_id):
 71                    _print_result(result)
 72
 73            elif args.subaction == 'timeseries':
 74                event_types = ([tuple(i.split('/')) for i in args.event_types]
 75                               if args.event_types is not None else None)
 76                for result in _query_timeseries(dbs, txn, args.partition_id,
 77                                                args.server_id, event_types):
 78                    _print_result(result)
 79
 80            else:
 81                raise ValueError('unsupported subaction')
 82
 83
 84def _print_result(result):
 85    print(json.encode(result))
 86
 87
 88def _query_settings(dbs, txn):
 89    db_type = common.DbType.SYSTEM_SETTINGS
 90    db = dbs[db_type]
 91    db_def = common.db_defs[db_type]
 92
 93    for key, value in txn.cursor(db=db):
 94        settings_id = db_def.decode_key(key)
 95        data = db_def.decode_value(value)
 96
 97        yield {'settings_id': settings_id.name,
 98               'data': data}
 99
100
101def _query_last_event_id(dbs, txn, server_id):
102    db_type = common.DbType.SYSTEM_LAST_EVENT_ID
103    db = dbs[db_type]
104    db_def = common.db_defs[db_type]
105
106    for key, value in txn.cursor(db=db):
107        decoded_server_id = db_def.decode_key(key)
108        if server_id is not None and server_id != decoded_server_id:
109            continue
110
111        event_id = db_def.decode_value(value)
112        yield common.event_id_to_json(event_id)
113
114
115def _query_last_timestamp(dbs, txn, server_id):
116    db_type = common.DbType.SYSTEM_LAST_TIMESTAMP
117    db = dbs[db_type]
118    db_def = common.db_defs[db_type]
119
120    for key, value in txn.cursor(db=db):
121        decoded_server_id = db_def.decode_key(key)
122        if server_id is not None and server_id != decoded_server_id:
123            continue
124
125        timestamp = db_def.decode_value(value)
126        yield common.timestamp_to_json(timestamp)
127
128
129def _query_ref(dbs, txn, server_id):
130    ref_db_type = common.DbType.REF
131    ref_db = dbs[ref_db_type]
132    ref_db_def = common.db_defs[ref_db_type]
133
134    latest_type_db_type = common.DbType.LATEST_TYPE
135    latest_type_db = dbs[latest_type_db_type]
136    latest_type_db_def = common.db_defs[latest_type_db_type]
137
138    for key, value in txn.cursor(db=ref_db):
139        event_id = ref_db_def.decode_key(key)
140        if server_id is not None and event_id.server != server_id:
141            continue
142
143        event_refs = ref_db_def.decode_value(value)
144
145        for event_ref in event_refs:
146            if isinstance(event_ref, common.LatestEventRef):
147                latest_type_key = latest_type_db_def.encoded_key(event_ref.key)
148                latest_type_value = txn.get(latest_type_key, db=latest_type_db)
149
150                event_type = latest_type_db_def.decode_value(latest_type_value)
151                yield {'event_id': common.event_id_to_json(event_id),
152                       'ref_type': 'latest',
153                       'event_type': list(event_type)}
154
155            if isinstance(event_ref, common.TimeseriesEventRef):
156                partition_id, timestamp, _ = event_ref.key
157                yield {'event_id': common.event_id_to_json(event_id),
158                       'ref_type': 'timeseries',
159                       'partition_id': partition_id,
160                       'timestamp': common.timestamp_to_json(timestamp)}
161
162
163def _query_latest(dbs, txn, event_types):
164    db_type = common.DbType.LATEST_DATA
165    db = dbs[db_type]
166    db_def = common.db_defs[db_type]
167
168    subscription = common.create_subscription([('*', )] if event_types is None
169                                              else event_types)
170
171    for _, value in txn.cursor(db=db):
172        event = db_def.decode_value(value)
173        if not subscription.matches(event.type):
174            continue
175
176        yield common.event_to_json(value)
177
178
179def _query_partition(dbs, txn, partition_id):
180    db_type = common.DbType.TIMESERIES_PARTITION
181    db = dbs[db_type]
182    db_def = common.db_defs[db_type]
183
184    for key, value in txn.cursor(db=db):
185        decoded_partition_id = db_def.decode_key(key)
186        if partition_id is not None and partition_id != decoded_partition_id:
187            continue
188
189        data = db_def.decode_value(value)
190        yield {'partition_id': decoded_partition_id,
191               'data': data}
192
193
194def _query_timeseries(dbs, txn, partition_id, server_id, event_types):
195    db_type = common.DbType.TIMESERIES_DATA
196    db = dbs[db_type]
197    db_def = common.db_defs[db_type]
198
199    subscription = common.create_subscription([('*', )] if event_types is None
200                                              else event_types)
201
202    for key, value in txn.cursor(db=db):
203        decoded_partition_id, _, event_id = db_def.decode(key)
204
205        if partition_id is not None and partition_id != decoded_partition_id:
206            continue
207
208        if server_id is not None and server_id != event_id.server:
209            continue
210
211        event = db_def.decode_value(value)
212        if not subscription.matches(event.type):
213            continue
214
215        yield common.event_to_json(value)
def create_argument_parser(subparsers) -> argparse.ArgumentParser:
10def create_argument_parser(subparsers) -> argparse.ArgumentParser:
11    parser = subparsers.add_parser('query')
12    parser.add_argument('--db', metavar='PATH', type=Path, required=True)
13    subsubparsers = parser.add_subparsers(dest='subaction', required=True)
14
15    subsubparsers.add_parser('settings')
16
17    last_event_id_parser = subsubparsers.add_parser('last_event_id')
18    last_event_id_parser.add_argument('--server-id', type=int, default=None)
19
20    last_timestamps_parser = subsubparsers.add_parser('last_timestamp')
21    last_timestamps_parser.add_argument('--server-id', type=int, default=None)
22
23    ref_parser = subsubparsers.add_parser('ref')
24    ref_parser.add_argument('--server-id', type=int, default=None)
25
26    latest_parser = subsubparsers.add_parser('latest')
27    latest_parser.add_argument('--event-types', type=str, default=None,
28                               nargs='*')
29
30    partition_parser = subsubparsers.add_parser('partition')
31    partition_parser.add_argument('--partition-id', type=int, default=None)
32
33    timeseries_parser = subsubparsers.add_parser('timeseries')
34    timeseries_parser.add_argument('--partition-id', type=int, default=None)
35    timeseries_parser.add_argument('--server-id', type=int, default=None)
36    timeseries_parser.add_argument('--event-types', type=str, default=None,
37                                   nargs='*')
38
39    return parser
def query(args):
42def query(args):
43    with common.ext_create_env(args.db, readonly=True) as env:
44        dbs = {db_type: common.ext_open_db(env, db_type, False)
45               for db_type in common.DbType}
46
47        with env.begin(buffers=True) as txn:
48            if args.subaction == 'settings':
49                for result in _query_settings(dbs, txn):
50                    _print_result(result)
51
52            elif args.subaction == 'last_event_id':
53                for result in _query_last_event_id(dbs, txn, args.server_id):
54                    _print_result(result)
55
56            elif args.subaction == 'last_timestamp':
57                for result in _query_last_timestamp(dbs, txn, args.server_id):
58                    _print_result(result)
59
60            elif args.subaction == 'ref':
61                for result in _query_ref(dbs, txn):
62                    _print_result(result)
63
64            elif args.subaction == 'latest':
65                event_types = ([tuple(i.split('/')) for i in args.event_types]
66                               if args.event_types is not None else None)
67                for result in _query_ref(dbs, txn, event_types):
68                    _print_result(result)
69
70            elif args.subaction == 'partition':
71                for result in _query_partition(dbs, txn, args.partition_id):
72                    _print_result(result)
73
74            elif args.subaction == 'timeseries':
75                event_types = ([tuple(i.split('/')) for i in args.event_types]
76                               if args.event_types is not None else None)
77                for result in _query_timeseries(dbs, txn, args.partition_id,
78                                                args.server_id, event_types):
79                    _print_result(result)
80
81            else:
82                raise ValueError('unsupported subaction')