Skip to main content

Data synchronization

Post objects

Data in Monitoring Posts is averaged data. Consider twenty-minute averages on device with measurement frequency of one minute. This means that data will arrive at the final state only after receiving all twenty data packets from device. After server receives each data packet in twenty-minute interval, average data will change.

What does it mean in real-life?

This means that data synchronization over date ranges may be conducted with errors. To solve this issue we propose the following way to store, update, and retrieve data using API.

  1. In this approach, unique data key is a pair (post object identifier, averaging time)
  2. To track changes in averages of already obtained data, we suggest using Version field. Version - version of the averaged data which:
    1. Monotonically increases for whole database.
    2. Changes when data is updated for some timestamp.
  3. Synchronization is performed by means of a page-by-page query, which parameters include last value of version saved in database (query parameter - ?version__gt=). For key (post object, averaging time) it is possible to get different values of measurements at repeated queries. So when synchronizing, it is necessary to check whether there are no previously saved data for the key (post object #, date of averaging).

An example of a situation when values of previously obtained averaging may change in the next query.

  1. In time interval from 4 pm. to 4:10 pm. post object #1 received 10 packets from station object that sends data once a minute,
  2. When API is requested at 4:10 pm. average data provided at the end of interval will return
    1. Post object #1
    2. Time - 4:20 pm.
    3. Version - 101
    4. Average value of some indicator - x
  3. Post object continues to receive data from 4:10 pm. to 4:20 pm. - 10 more packets.
  4. When API is requested at 4:20 pm. with parameter, version 101 will return
    1. Post object #1
    2. Time 4:20 pm.
    3. Version 102
    4. Average value of same indicator - y In this case, use key (post object #1, time 4:20 pm.) - to check for data, if data exist (as in our case) – to update. Otherwise just add data. You also need to save last version value (102) for future API requests

Described situation can also occur in case of network problems on device. After fixing the problem, device will send previously measured values when communication is restored, and averaging data will change.

Example of data storage for django-framework


def save_packet(date, post_id, version, measurements):
packet = Packet.objects.filter.get(date=date, post_id=post_id)
if packet is None:
Packet.objects.create(date=date, post_id=post_id,version=version, measurements=measurements)
else:
packet.measurements = measurements
packet.save()

In this example, to get version for the following queries, you have to select maximum value among stored ones.

example of getting data from API using page-by-page output.



import os
import json

import requests

post_list_url = 'https://mycityair.ru/harvester/v2/Posts'
api_key = os.getenv('CITYAIR_TOKEN') # or set your api key directly

assert api_key, 'API key is empty'

headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}


def make_request(url):
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()


def pretty_print(data: dict):
formatted_response = json.dumps(data, indent=2, ensure_ascii=False)
print(formatted_response)


def build_measurement_url(post_id, version, limit):
interval = '1h'
return f'https://mycityair.ru/harvester/v2/Posts/{post_id}/measurements?' \
f'interval={interval}&' \
f'limit={limit}&' \
f'version__gt={version}'


posts = make_request(post_list_url)
assert len(posts) > 0, 'At least one post is required'
post = posts[0]
print(f'Got post {post["id"]}:{post["name"]}')

last_version_id = 1 # for the first sync process
for i in range(5): # five iterations, just for an example
url = build_measurement_url(post['id'], last_version_id, limit=2)
response = make_request(url)
print(f'Iteration #{i}')
if len(response['data']) < 0:
print("No new data on server")
break
pretty_print(response['data'])
for packet in response['data']:
# save data
pass
last_version_id = max(map(lambda d: d['version'], response['data']))

Station objects

Data from devices is transmitted as received. Unlike post objects, the data is transmitted without averaging. So synchronization is performed according to ascending packet identifier.

import os
import json
import datetime

import requests

station_list_url = 'https://mycityair.ru/harvester/v2/Stations'
api_key = os.getenv('CITYAIR_TOKEN') # or set your api key directly

assert api_key, 'API key is empty'

headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}


def make_request(url):
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()


def pretty_print(data: dict):
formatted_response = json.dumps(data, indent=2, ensure_ascii=False)
print(formatted_response)


def build_measurement_url(station_id, date_start, date_end):
return f'https://mycityair.ru/harvester/v2/Stations/{station_id}/measurements?' \
f'date__gt={start_date.isoformat()}&' \
f'date__lt={end_date.isoformat()}'


stations = make_request(station_list_url)
assert len(stations) > 0, 'At least one station is required'
station = stations[0]
print(f'Got station {station["id"]}:{station["name"]}')

hour = datetime.timedelta(hours=1)
start_date = datetime.datetime.utcnow() - datetime.timedelta(hours=300)
end_date = start_date + hour


for i in range(5): # do five sync iterations
url = build_measurement_url(station['id'], start_date, end_date)
response = make_request(url)
print(f'Iteration #{i}')
if len(response['data']) < 0:
print("No new data on server")
break
pretty_print(response['data'])
for packet in response['data']:
# save data
pass
start_date += hour
end_date += hour