Files
2022-09-02 18:53:39 -07:00

204 lines
6.0 KiB
Python

import sys
import os
import shutil
import json
import time
from pathlib import Path, PurePath
import requests
from bs4 import BeautifulSoup
root_dir = Path()
current_versions_file = Path()
repo_file = Path()
work_dir = Path()
api_headers = {}
versions = {}
repo = {}
def set_paths():
global root_dir, current_versions_file, repo_file, work_dir
if len(sys.argv) < 3:
raise Exception('Bad set of args provided to updater.py')
root_dir = Path(sys.argv[2])
current_versions_file = root_dir / 'current_versions.json'
repo_file = root_dir / 'repo.json'
work_dir = root_dir / 'tmp_workdir'
def maybe_set_api_key():
global api_headers
try:
token_file = root_dir / '.github_token'
with open(token_file, 'r') as fd:
api_key = fd.read()
api_headers = {'Authorization': f'token {api_key}'}
except:
print('No GitHub API token provided. This is probably fine.')
def download_with_progress(url, destination_fn):
block_size_b = 512
count = 1
r = requests.get(url, stream=True)
try:
total_size_mb = int(r.headers.get('content-length')) / 1024 / 1024
except:
total_size_mb = 0
with open(destination_fn, 'wb') as fd:
for chunk in r.iter_content(chunk_size=block_size_b):
fd.write(chunk)
written_mb = (count * block_size_b) / 1024 / 1024
if total_size_mb > 0:
pct_complete = (written_mb / total_size_mb) * 100
else:
pct_complete = 0
print(f'Transferred {written_mb:.2f}MB / {total_size_mb:.2f}MB ({pct_complete:.0f}%)', end='\r', flush=True)
count += 1
print(f'Downloaded {url}')
def fetch_repo_list():
print('Fetching latest repo list...')
repo_url = 'https://raw.githubusercontent.com/rivergillis/update-pocket/main/repo.json'
try:
download_with_progress(repo_url, repo_file)
except:
print('Error fetching latest repos. Using version on disk.')
def load_repo_list():
file = open(repo_file, 'r')
global repo
repo = json.load(file)
def get_versions():
try:
file = open(current_versions_file, 'r')
global versions
versions = json.load(file)
except:
print('No current versions metadata found. Updating everything.')
def write_versions():
with open(current_versions_file, 'w') as fp:
json.dump(versions, fp)
def update_firmware():
print('Fetching latest firmware version...')
firmware_page = 'https://www.analogue.co/support/pocket/firmware/latest'
response = requests.get(firmware_page)
html = BeautifulSoup(response.text, 'html.parser')
bin_url = None
for link in html.find_all('a'):
href = link.get('href')
if href.endswith('.bin'):
bin_url = href
break
if bin_url is None:
print('ERROR: Unable to find latest firmware update.')
return
basename = bin_url.split('/')[-1]
if versions.get('firmware', '') == basename:
print('Firmware up to date.')
return
# Remove older firmware updates
def remove_old_firmware(p):
if not p.is_file():
return
if p.name.startswith('pocket_firmware') and p.name != basename:
print(f'Removing old firmware file {p.name}')
p.unlink()
for p in root_dir.glob('*.bin'):
remove_old_firmware(p)
print(f'Newer pocket firmware found: {basename}, updating...')
destination_fn = root_dir / basename
download_with_progress(bin_url, destination_fn)
versions['firmware'] = basename
def maybe_update_bios(bios_item):
bios_destination_fn = root_dir / bios_item['path']
if bios_destination_fn.is_file():
return
basename = PurePath(bios_destination_fn).name
print(f'Bios for {basename} not detected. Installing...')
bios_url = bios_item['url']
download_with_progress(bios_url, bios_destination_fn)
def update_repo(item):
repo_name = item['repo']
try:
releases_url = f'https://api.github.com/repos/{repo_name}/releases/latest'
r = requests.get(releases_url, headers=api_headers)
resp_json = r.json()
release_url = resp_json['url']
except:
# Couldn't get latest release (bad tagging?), just use the first in the list
releases_url = f'https://api.github.com/repos/{repo_name}/releases'
r = requests.get(releases_url, headers=api_headers)
resp_json = r.json()[0]
release_url = resp_json['url']
item_path = item.get('path', '.')
if versions.get(repo_name, '') == release_url:
print(f'{repo_name} up to date.')
return
print(f'New version found for {repo_name}, updating...')
for asset in resp_json['assets']:
basename = asset['name']
pkg_url = asset['browser_download_url']
workdir_dest_fn = work_dir / basename
download_with_progress(pkg_url, workdir_dest_fn)
extract_dir = root_dir / item_path
extract_dir.mkdir(parents=True, exist_ok=True)
try:
shutil.unpack_archive(workdir_dest_fn, extract_dir)
except shutil.ReadError:
print(f"Couldn't unpack {basename}, looking for other assets.")
continue
if item.get('bios', None) is not None:
for bios_item in item['bios']:
maybe_update_bios(bios_item)
versions[repo_name] = release_url
def update_repos():
print('Fetching openFPGA core updates...')
for item in repo:
if item.get('repo', None) is None:
continue
try:
update_repo(item)
except BaseException as e:
print(f"Unable to update {item['repo']}: {e}")
def main():
set_paths()
maybe_set_api_key()
work_dir.mkdir(parents=True, exist_ok=True)
get_versions()
update_firmware()
fetch_repo_list()
load_repo_list()
update_repos()
write_versions()
shutil.rmtree(work_dir)
print('Update complete! Go play some games.')
if __name__ == '__main__':
main()