#!/usr/bin/env python3 # encoding: utf-8 """ add-on_manager.py -- a command-line client for the Wesnoth add-on server This tool is mainly intendended for user-made content authors and maintainers. It can be used to manage the WML content on the Wesnoth add-on server. Available functions include listing, downloading, uploading, and deleting add-ons. """ import sys, os.path, re, time, glob, shutil from subprocess import Popen import wesnoth.wmlparser3 as wmlparser from wesnoth.campaignserver_client import CampaignClient if __name__ == "__main__": import argparse try: import psyco psyco.full() except ImportError: pass argumentparser = argparse.ArgumentParser() argumentparser.add_argument("-a", "--address", help="specify server address", default="add-ons.wesnoth.org") argumentparser.add_argument("--html", help="Output a HTML overview into the given directory.",) argumentparser.add_argument("-p", "--port", help="specify server port or BfW version (%s)" % " or ".join( [x[1] for x in CampaignClient.portmap]), default=CampaignClient.portmap[0][0]) argumentparser.add_argument("-l", "--list", help="list available add-ons", action="store_true",) argumentparser.add_argument("-w", "--wml", help="when listing add-ons, list the raw wml", action="store_true",) argumentparser.add_argument("-C", "--color", help="use colored WML output", action="store_true",) argumentparser.add_argument("-c", "--campaigns-dir", help="directory where add-ons are stored", default=".") argumentparser.add_argument("-P", "--password", help="password to use") argumentparser.add_argument("-d", "--download", help="download the named add-on; " + "name may be a Python regexp matched against all add-on names " + "(specify the path where to put it with -c, " + "current directory will be used by default)") argumentparser.add_argument("-T", "--type", help="Type of addons to download, e.g. 'era' or 'campaign'.") argumentparser.add_argument("-t", "--tar", help="When used together with --download, create tarballs of any " + "downloaded addons and put into the specified directory.") argumentparser.add_argument("--pbl", help="override standard PBL location") argumentparser.add_argument("-u", "--upload", help="Upload an add-on. " + "UPLOAD should be either the name of an add-on subdirectory," + "(in which case the client looks for _server.pbl beneath it) " + "or a path to the .pbl file (in which case the name of the " + "add-on subdirectory is the name of the path with .pbl removed)") argumentparser.add_argument("-s", "--status", help="Display the status of addons installed in the given " + "directory.") argumentparser.add_argument("-f", "--update", help="Update all installed add-ons in the given directory. " + "This works by comparing the _info.cfg file in each addon directory " + "with the version on the server.") argumentparser.add_argument("-V", "--verbose", help="be even more verbose for everything", action="store_true",) argumentparser.add_argument("-r", "--remove", help="remove the named add-on from the server, " + "set the password -P") argumentparser.add_argument("-R", "--raw-download", action="store_true", help="download as a binary WML packet") argumentparser.add_argument("--url", help="When used with --html, " + "a download link will be added for each campaign, with the given " + "base URL.") argumentparser.add_argument("-U", "--unpack", help="unpack the file UNPACK as a binary WML packet " + "(specify the add-on path with -c)") argumentparser.add_argument("--change-passphrase", nargs=3, metavar=("ADD-ON","OLD","NEW"), help="Change the passphrase for ADD-ON from OLD to NEW") args = argumentparser.parse_args() port = args.port if not args.port.isdigit(): for (portnum, version) in CampaignClient.portmap: if args.port == version: port = portnum break else: sys.stderr.write("Unknown BfW version %s\n" % args.port) sys.exit(1) address = args.address if not ":" in address: address += ":" + str(port) def get(name, title, version, type, uploads, dependencies, cdir): mythread = cs.get_campaign_raw_async(name) pcounter = 0 while not mythread.event.isSet(): mythread.event.wait(1) if pcounter != cs.counter: print("%s: %d/%d" % (name, cs.counter, cs.length)) pcounter = cs.counter if args.raw_download: file(name, "w").write(mythread.data) else: decoded = cs.decode(mythread.data) dirname = os.path.join(cdir, name) oldcfg_path = os.path.join(cdir, name + ".cfg") # Try to remove old campaign in case it exists. shutil.rmtree(dirname, True) try: os.remove(oldcfg_path) except OSError: pass print("Unpacking %s..." % name) cs.unpackdir(decoded, cdir, verbose=args.verbose) info = os.path.join(dirname, "_info.cfg") try: f = open(info, "w") infowml = """# # File automatically generated by Wesnoth to keep track # of version information on installed add-ons. DO NOT EDIT! # [info] \tdependencies="%s" \ttitle="%s" \ttype="%s" \tuploads=%s \tversion="%s" [/info] """ f.write(infowml % (dependencies, title, type, uploads, version)) f.close() except OSError: pass print_messages(decoded) if args.tar: try: os.mkdir(args.tar) except OSError: pass tarname = args.tar + "/" + name + ".tar.bz2" if os.path.isfile(oldcfg_path): oldcfg = name + ".cfg" if args.verbose: sys.stderr.write("Creating tarball with command: tar " + "cjf %(tarname)s -C %(cdir)s %(name)s %(oldcfg)s\n" % locals()) Popen(["tar", "cjf", tarname, "-C", cdir, name, oldcfg]) else: if args.verbose: sys.stderr.write("Creating tarball with command: tar " + "cjf %(tarname)s -C %(cdir)s %(name)s\n" % locals()) Popen(["tar", "cjf", tarname, "-C", cdir, name]) def print_messages(data): for message in data.get_all(tag = "message") + data.get_all(tag = "error"): print(message.get_text_val("message")) def parse_wml_file(name): p = wmlparser.Parser() p.parse_file(name) return p.root def get_info(name): """ Get info for a locally installed add-on. It expects a direct path to the _info.cfg file. """ if not os.path.exists(name): return None, None info = parse_wml_file(name) uploads = info.get_all(tag = "info")[0].get_text_val("uploads", "") version = info.get_all(tag = "info")[0].get_text_val("version", "") return uploads, version campaign_list = None if args.list: cs = CampaignClient(address) campaign_list = data = cs.list_campaigns() if data: campaigns = data.get_all(tag = "campaigns")[0] if args.wml: for campaign in campaigns.get_all(tag = "campaign"): print(campaign.debug()) else: column_sizes = [10, 5, 10, 7, 8, 8, 10, 5, 10, 13] columns = [["type", "name", "title", "author", "version", "uploads", "downloads", "size", "timestamp", "translate"]] for campaign in campaigns.get_all(tag = "campaign"): column = [ campaign.get_text_val("type", "?"), campaign.get_text_val("name", "?"), campaign.get_text_val("title", "?"), campaign.get_text_val("author", "?"), campaign.get_text_val("version", "?"), campaign.get_text_val("uploads", "?"), campaign.get_text_val("downloads", "?"), campaign.get_text_val("size", "?"), time.ctime(int(campaign.get_text_val("timestamp", "0"))), campaign.get_text_val("translate", "?")] columns.append(column) for i, s in enumerate(column_sizes): if 1 + len(column[i]) > s: column_sizes[i] = 1 + len(column[i]) for c in columns: for i, f in enumerate(c): sys.stdout.write(f.ljust(column_sizes[i])) sys.stdout.write("\n") print_messages(data) else: sys.stderr.write("Could not connect.\n") elif args.download: cs = CampaignClient(address) fetchlist = [] campaign_list = data = cs.list_campaigns() if data: campaigns = data.get_all(tag = "campaigns")[0] for campaign in campaigns.get_all(tag = "campaign"): name = campaign.get_text_val("name", "?") title = campaign.get_text_val("title") type = campaign.get_text_val("type", "") version = campaign.get_text_val("version", "") uploads = campaign.get_text_val("uploads", "") dependencies = campaign.get_text_val("dependencies", "") if re.escape(args.download).replace("\\_", "_") == args.download: if name == args.download: fetchlist.append((name, title, version, type, uploads, dependencies)) elif not args.type or args.type == type: if re.search(args.download, name): fetchlist.append((name, title, version, type, uploads, dependencies)) for name, title, version, type, uploads, dependencies in fetchlist: info = os.path.join(args.campaigns_dir, name, "_info.cfg") local_uploads, local_version = get_info(info) if uploads != local_uploads: # The uploads > local_uploads likely means a server reset if version != local_version or uploads > local_uploads: get(name, title, version, type, uploads, dependencies, args.campaigns_dir) else: print("Not downloading", name, \ "as the version already is", local_version, \ "(The add-on got re-uploaded.)") else: if args.verbose: print("Not downloading", name, \ "because it is already up-to-date.") elif args.unpack: cs = CampaignClient(address) data = file(args.unpack).read() decoded = cs.decode(data) print("Unpacking %s..." % args.unpack) cs.unpackdir(decoded, args.campaigns_dir, verbose=True) elif args.remove: cs = CampaignClient(address) data = cs.delete_campaign(args.remove, args.password) print_messages(data) elif args.change_passphrase: cs = CampaignClient(address) data = cs.change_passphrase(*args.change_passphrase) print_messages(data) elif args.upload: cs = CampaignClient(address) if os.path.isdir(args.upload): # else basename returns an empty string args.upload = args.upload.rstrip("/") # New style with _server.pbl pblfile = os.path.join(args.upload, "_server.pbl") name = os.path.basename(args.upload) wmldir = args.upload cfgfile = None # _main.cfg will be uploaded with the rest ignfile = os.path.join(args.upload, "_server.ign") else: # Old style with external .pbl file pblfile = args.upload name = os.path.basename(args.upload) name = os.path.splitext(name)[0] wmldir = os.path.join(os.path.dirname(args.upload), name) cfgfile = args.upload.replace(".pbl", ".cfg") ignfile = args.upload.replace(".pbl", ".ign") if args.pbl: pblfile = args.pbl pbl = parse_wml_file(pblfile) if os.path.exists(ignfile): ign = open(ignfile).readlines() # strip line endings and whitespace ign = [i.strip() for i in ign if i.strip()] else: ign = [ ".*", ".*/", "#*#", "*~", "*-bak", "*.swp", "*.pbl", "*.ign", "_info.cfg", "*.exe", "*.bat", "*.cmd", "*.com", "*.scr", "*.sh", "*.js", "*.vbs", "*.o", "Thumbs.db", "*.wesnoth", "*.project"] mythread = cs.put_campaign_async(name, cfgfile, wmldir, ign, pbl) pcounter = 0 while not mythread.event.isSet(): mythread.event.wait(1) if cs.counter != pcounter: print("%d/%d" % (cs.counter, cs.length)) pcounter = cs.counter print_messages(mythread.data) elif args.update or args.status: if args.status: cdir = args.status else: cdir = args.update dirs = glob.glob(os.path.join(cdir, "*")) dirs = [x for x in dirs if os.path.isdir(x)] cs = CampaignClient(address) campaign_list = data = cs.list_campaigns() if not data: sys.stderr.write("Could not connect to the add-on server.\n") sys.exit(-1) campaigns = {} for c in data.get_all(tag = "campaigns")[0].get_all(tag = "campaign"): name = c.get_text_val("name") campaigns[name] = c for d in dirs: dirname = os.path.basename(d) if dirname in campaigns: info = os.path.join(d, "_info.cfg") stitle = campaigns[dirname].get_text_val("title", "") sversion = campaigns[dirname].get_text_val("version", "") stype = campaigns[dirname].get_text_val("type", "") srev = campaigns[dirname].get_text_val("uploads", "") sdeps = campaigns[dirname].get_text_val("dependencies", "") if os.path.exists(info): lrev, lversion = get_info(info) if not srev: sys.stdout.write(" ? " + dirname + " - has no " + "version info on the server.\n") elif srev == lrev: sys.stdout.write(" " + dirname + " - is up to date.\n") elif sversion == lversion: sys.stdout.write(" # " + dirname + " - is version " + sversion + (" but you have revision %s not %s." + " (The add-on got re-uploaded.)\n") % (lrev, srev)) if srev > lrev: # server reset? if args.update: get(dirname, stitle, sversion, stype, srev, sdeps, cdir) else: sys.stdout.write(" * " + dirname + " - you have " + "revision " + lrev + " but revision " + srev + " is available.\n") if args.update: get(dirname, stitle, sversion, stype, srev, sdeps, cdir) else: sys.stdout.write(" ? " + dirname + " - is installed but has no " + "version info.\n") if args.update: get(dirname, stitle, sversion, stype, srev, sdeps, cdir) else: sys.stdout.write(" - %s - is installed but not on server.\n" % dirname) elif args.html: pass else: argumentparser.print_help() if args.html: if not campaign_list: cs = CampaignClient(address) campaign_list = cs.list_campaigns() del cs if campaign_list: import addon_manager.html addon_manager.html.output(args.html, args.url, campaign_list) else: sys.stderr.write("Could not retrieve campaign list " + "for HTML output.\n")