wesnoth/data/tools/wesnoth_addon_manager

461 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
# encoding: utf-8
"""
add-on_manager.py -- a command-line client for the Wesnoth add-on server
This tool is mainly intendended for user-made content authors and maintainers.
It can be used to manage the WML content on the Wesnoth add-on server.
Available functions include listing, downloading, uploading, and deleting
add-ons.
"""
import sys, os.path, re, time, glob, shutil
from subprocess import Popen
import wesnoth.wmlparser3 as wmlparser
from wesnoth.campaignserver_client import CampaignClient
from wesnoth import version
# This is the validation code for the -u arguments. It checks if the input path is valid
def valid_file_path(path):
if os.path.isdir(path) or os.path.isfile(path):
return path
else:
sys.stderr.write("No such file or directory: %s\n" % path)
sys.exit(1)
return None
if __name__ == "__main__":
import argparse
argumentparser = argparse.ArgumentParser(prog="wesnoth_addon_manager")
argumentparser.add_argument("--version", action="version",
version="%(prog)s " + version.as_string)
argumentparser.add_argument("-a", "--address", help="specify server address",
default="add-ons.wesnoth.org")
argumentparser.add_argument("--html",
help="Output a HTML overview into the given directory.",)
argumentparser.add_argument("-p", "--port",
help="specify server port or BfW version (%s)" % " or ".join(
[x[1] for x in CampaignClient.portmap]),
default=CampaignClient.portmap[0][0])
argumentparser.add_argument("-l", "--list", help="list available add-ons",
action="store_true",)
argumentparser.add_argument("-w", "--wml",
help="when listing add-ons, list the raw wml",
action="store_true",)
argumentparser.add_argument("-c", "--campaigns-dir",
help="directory where add-ons are stored",
default=".")
argumentparser.add_argument("-d", "--download",
help="download the named add-on; " +
"name may be a Python regexp matched against all add-on names " +
"(specify the path where to put it with -c, " +
"current directory will be used by default)")
argumentparser.add_argument("-T", "--type",
help="Type of addons to download, e.g. 'era' or 'campaign'.")
argumentparser.add_argument("-t", "--tar",
help="When used together with --download, create tarballs of any " +
"downloaded addons and put into the specified directory.")
argumentparser.add_argument("--pbl", help="override standard PBL location")
argumentparser.add_argument("--pbl-key", action='append', nargs=2,
metavar=("KEY", "VALUE"),
help="When uploading, override KEY with VALUE in _server.pbl. " +
"No changes are written to disk, only the upload is affected. " +
"This option only makes sense with --upload.")
argumentparser.add_argument("-u", "--upload",
help="Upload an add-on. " +
"UPLOAD is the name of an add-on subdirectory, which must " +
"contain a _server.pbl file", type=valid_file_path)
argumentparser.add_argument("-s", "--status",
help="Display the status of addons installed in the given " +
"directory.")
argumentparser.add_argument("-f", "--update",
help="Update all installed add-ons in the given directory. " +
"This works by comparing the _info.cfg file in each addon directory " +
"with the version on the server.")
argumentparser.add_argument("-V", "--verbose",
help="be even more verbose for everything",
action="store_true",)
argumentparser.add_argument("-r", "--remove", nargs=2,
metavar=("ADD-ON", "PASSPHRASE"),
help="remove the named add-on from the server")
argumentparser.add_argument("-R", "--raw-download",
action="store_true",
help="download as a binary WML packet")
argumentparser.add_argument("--url", help="When used with --html, " +
"a download link will be added for each campaign, with the given " +
"base URL.")
argumentparser.add_argument("--datadir", help="When used with --html, " +
"specifies the Wesnoth data dir where add-on icons will be copied " +
"from.")
argumentparser.add_argument("-U", "--unpack",
help="unpack the file UNPACK as a binary WML packet " +
"(specify the add-on path with -c)")
argumentparser.add_argument("--change-passphrase", nargs=3,
metavar=("ADD-ON","OLD","NEW"),
help="Change the passphrase for ADD-ON from OLD to NEW")
argumentparser.add_argument("--terms", # no short option here
action="store_true",
help="Retrieves and prints the upload terms from the server "+
"(without formatting).")
argumentparser.add_argument("-S", "--secure",
action="store_true",
help="Connect to the add-ons server using SSL/TLS encryption.")
argumentparser.add_argument("-6", "--ipv6",
action="store_true",
help="Connect to the add-ons server using IPv6 connectivity.")
args = argumentparser.parse_args()
port = args.port
if not args.port.isdigit():
for (portnum, version) in CampaignClient.portmap:
if args.port == version:
port = portnum
break
else:
for (portnum, version) in CampaignClient.deactivated:
if args.port == version:
sys.stderr.write("Server for BfW version %s is deactivated\n" % version)
sys.exit(1)
else:
sys.stderr.write("Unknown BfW version %s\n" % args.port)
sys.exit(1)
address = args.address
if not ":" in address:
address += ":" + str(port)
def get(name, title, version, type, uploads, dependencies, cdir):
mythread = cs.get_campaign_raw_async(name)
pcounter = 0
while not mythread.event.is_set():
mythread.event.wait(1)
if pcounter != cs.counter:
print("%s: %d/%d" % (name, cs.counter, cs.length))
pcounter = cs.counter
if args.raw_download:
with open(name, "wb") as f:
f.write(mythread.data)
else:
decoded = cs.decode(mythread.data)
dirname = os.path.join(cdir, name)
oldcfg_path = os.path.join(cdir, name + ".cfg")
# Try to remove old campaign in case it exists.
shutil.rmtree(dirname, True)
try: os.remove(oldcfg_path)
except OSError: pass
print("Unpacking %s..." % name)
cs.unpackdir(decoded, cdir, verbose=args.verbose)
info = os.path.join(dirname, "_info.cfg")
try:
f = open(info, "w")
infowml = """#
# File automatically generated by Wesnoth to keep track
# of version information on installed add-ons. DO NOT EDIT!
#
[info]
\tdependencies="%s"
\ttitle="%s"
\ttype="%s"
\tuploads=%s
\tversion="%s"
[/info]
"""
f.write(infowml %
(dependencies, title, type, uploads, version))
f.close()
except OSError:
pass
print_messages(decoded)
if args.tar:
try: os.mkdir(args.tar)
except OSError: pass
tarname = args.tar + "/" + name + ".tar.bz2"
if os.path.isfile(oldcfg_path):
oldcfg = name + ".cfg"
if args.verbose:
sys.stderr.write("Creating tarball with command: tar " +
"cjf %(tarname)s -C %(cdir)s %(name)s %(oldcfg)s\n" %
locals())
Popen(["tar", "cjf", tarname, "-C", cdir, name, oldcfg])
else:
if args.verbose:
sys.stderr.write("Creating tarball with command: tar " +
"cjf %(tarname)s -C %(cdir)s %(name)s\n" % locals())
Popen(["tar", "cjf", tarname, "-C", cdir, name])
def print_messages(data):
for message in data.get_all(tag = "message") + data.get_all(tag = "error"):
print(message.get_text_val("message"))
def parse_wml_file(name):
p = wmlparser.Parser()
p.parse_file(name)
return p.root
def parse_wml_text(text):
p = wmlparser.Parser()
p.parse_text(text)
return p.root
def get_info(name):
"""
Get info for a locally installed add-on. It expects a direct path
to the _info.cfg file.
"""
if not os.path.exists(name):
return None, None
info = parse_wml_file(name)
uploads = info.get_all(tag = "info")[0].get_text_val("uploads", "")
version = info.get_all(tag = "info")[0].get_text_val("version", "")
return uploads, version
def fixup(column):
return column.replace("\n", "\\n").replace("\t", "\\t")
campaign_list = None
if args.list:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
campaign_list = data = cs.list_campaigns()
if data:
campaigns = data.get_all(tag = "campaigns")[0]
if args.wml:
for campaign in campaigns.get_all(tag = "campaign"):
print(campaign.debug())
else:
columns = [["type", "name", "title", "author",
"version", "uploads", "downloads",
"size", "timestamp", "translate"]]
for campaign in campaigns.get_all(tag = "campaign"):
column = [
campaign.get_text_val("type", "?"),
campaign.get_text_val("name", "?"),
campaign.get_text_val("title", "?"),
campaign.get_text_val("author", "?"),
campaign.get_text_val("version", "?"),
campaign.get_text_val("uploads", "?"),
campaign.get_text_val("downloads", "?"),
campaign.get_text_val("size", "?"),
time.ctime(int(campaign.get_text_val("timestamp", "0"))),
campaign.get_text_val("translate", "?")]
columns.append(column)
for c in columns:
print(*map(fixup, c), sep="\t", end="\n")
print_messages(data)
else:
sys.stderr.write("Could not connect.\n")
elif args.download:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
fetchlist = []
campaign_list = data = cs.list_campaigns()
if data:
campaigns = data.get_all(tag = "campaigns")[0]
for campaign in campaigns.get_all(tag = "campaign"):
name = campaign.get_text_val("name", "?")
title = campaign.get_text_val("title")
type = campaign.get_text_val("type", "")
version = campaign.get_text_val("version", "")
uploads = campaign.get_text_val("uploads", "")
dependencies = campaign.get_text_val("dependencies", "")
if re.escape(args.download).replace("\\_", "_") == args.download:
if name == args.download:
fetchlist.append((name, title, version, type, uploads, dependencies))
elif not args.type or args.type == type:
if re.search(args.download, name):
fetchlist.append((name, title, version, type, uploads, dependencies))
for name, title, version, type, uploads, dependencies in fetchlist:
info = os.path.join(args.campaigns_dir, name, "_info.cfg")
local_uploads, local_version = get_info(info)
if uploads != local_uploads:
# The uploads > local_uploads likely means a server reset
if version != local_version or uploads > local_uploads:
get(name, title, version, type, uploads, dependencies, args.campaigns_dir)
else:
print("Not downloading", name, \
"as the version already is", local_version, \
"(The add-on got re-uploaded.)")
else:
if args.verbose:
print("Not downloading", name, \
"because it is already up-to-date.")
elif args.unpack:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
with open(args.unpack, "rb") as f:
data = f.read()
decoded = cs.decode(data)
print("Unpacking %s..." % args.unpack)
cs.unpackdir(decoded, args.campaigns_dir, verbose=True)
elif args.remove:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
data = cs.delete_campaign(*args.remove)
if data is None:
print("Failed to remove add-on '{}'.".format(args.remove[0]), file=sys.stderr)
else:
print_messages(data)
elif args.change_passphrase:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
data = cs.change_passphrase(*args.change_passphrase)
print_messages(data)
elif args.upload:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
if os.path.isdir(args.upload):
# else basename returns an empty string
args.upload = args.upload.rstrip("/")
# Only the new style with _server.pbl is supported
pbl_file_name = os.path.join(args.upload, "_server.pbl")
name = os.path.basename(os.path.realpath(args.upload))
wmldir = args.upload
ignfile = os.path.join(args.upload, "_server.ign")
else:
raise RuntimeError("the -u/--upload path must be a directory")
if args.pbl:
pbl_file_name = args.pbl
with open(pbl_file_name, 'r') as pbl_file:
pbl_text = pbl_file.read()
for key_value_pair in args.pbl_key or []:
key = key_value_pair[0]
value = key_value_pair[1]
if not re.match("^[a-zA-Z]+[_a-zA-Z]*$", key):
raise ValueError("non-standard --pbl-key " + key)
pbl_text = pbl_text + '\n' + key + '="' + value.replace('"', '""') + '"'
pbl = parse_wml_text(pbl_text)
if os.path.exists(ignfile):
ign = open(ignfile).readlines()
# strip line endings and whitespace
ign = [i.strip() for i in ign if i.strip()]
else:
ign = [
".*",
".*/",
"#*#",
"*~",
"*-bak",
"*.swp",
"*.pbl",
"*.ign",
"_info.cfg",
"*.exe",
"*.bat",
"*.cmd",
"*.com",
"*.scr",
"*.sh",
"*.js",
"*.vbs",
"*.o",
"*.ini",
"Thumbs.db",
"*.wesnoth",
"*.project",
"__MACOSX/"]
mythread = cs.put_campaign_async(name, wmldir, ign, pbl)
pcounter = 0
while not mythread.event.is_set():
mythread.event.wait(1)
if cs.counter != pcounter:
print("%d/%d" % (cs.counter, cs.length))
pcounter = cs.counter
print_messages(mythread.data)
elif args.update or args.status:
if args.status:
cdir = args.status
else:
cdir = args.update
dirs = glob.glob(os.path.join(cdir, "*"))
dirs = [x for x in dirs if os.path.isdir(x)]
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
campaign_list = data = cs.list_campaigns()
if not data:
sys.stderr.write("Could not connect to the add-on server.\n")
sys.exit(-1)
campaigns = {}
for c in data.get_all(tag = "campaigns")[0].get_all(tag = "campaign"):
name = c.get_text_val("name")
campaigns[name] = c
for d in dirs:
dirname = os.path.basename(d)
if dirname in campaigns:
info = os.path.join(d, "_info.cfg")
stitle = campaigns[dirname].get_text_val("title", "")
sversion = campaigns[dirname].get_text_val("version", "")
stype = campaigns[dirname].get_text_val("type", "")
srev = campaigns[dirname].get_text_val("uploads", "")
sdeps = campaigns[dirname].get_text_val("dependencies", "")
if os.path.exists(info):
lrev, lversion = get_info(info)
if not srev:
sys.stdout.write(" ? " + dirname + " - has no " +
"version info on the server.\n")
elif srev == lrev:
sys.stdout.write(" " + dirname +
" - is up to date.\n")
elif sversion == lversion:
sys.stdout.write(" # " + dirname + " - is version " +
sversion + (" but you have revision %s not %s." +
" (The add-on got re-uploaded.)\n") %
(lrev, srev))
if srev > lrev: # server reset?
if args.update:
get(dirname, stitle, sversion, stype, srev, sdeps, cdir)
else:
sys.stdout.write(" * " + dirname + " - you have " +
"revision " + lrev + " but revision " + srev +
" is available.\n")
if args.update: get(dirname, stitle, sversion, stype, srev, sdeps, cdir)
else:
sys.stdout.write(" ? " + dirname +
" - is installed but has no " +
"version info.\n")
if args.update: get(dirname, stitle, sversion, stype, srev, sdeps, cdir)
else:
sys.stdout.write(" - %s - is installed but not on server.\n" %
dirname)
elif args.terms:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
data = cs.get_terms()
text = data.get_all(tag = "message")[0].get_text_val("message", "")
# remove Pango markup
text = re.sub(r"<.*?>", "", text)
sys.stdout.write(text + "\n")
elif args.html:
pass
else:
argumentparser.print_help()
if args.html:
if not campaign_list:
cs = CampaignClient(address, secure=args.secure, ipv6=args.ipv6)
campaign_list = cs.list_campaigns()
del cs
if campaign_list:
import addon_manager.html
addon_manager.html.output(args.html, args.url, args.datadir,
campaign_list)
else:
sys.stderr.write("Could not retrieve campaign list " +
"for HTML output.\n")