Remove Python AI, resolving bug #13047 and #13048.

This commit is contained in:
Daniel Franke 2009-02-23 07:05:50 +00:00
parent 29161a19b7
commit d2d21276a2
29 changed files with 10 additions and 6403 deletions

View File

@ -61,7 +61,6 @@ set(LOCALEDIR "translations" CACHE STRING "change the name of the locale data di
set(PREFERENCES_DIR "" CACHE STRING "Use a non-default preferences directory (.wesnoth on unix)")
#Game options
option(ENABLE_PYTHON "Enable in-game python extensions" ON)
option(ENABLE_FRIBIDI "Enable FriBIDi support" ON)
set(GUI "normal" CACHE STRING "Set for GUI reductions for resolutions down to 320x240 (PDAs) (normal|tiny)")
@ -132,12 +131,6 @@ if(GUI STREQUAL "tiny")
add_definitions(-DUSE_TINY_GUI)
endif(GUI STREQUAL "tiny")
if(ENABLE_PYTHON AND PYTHON_LIBRARIES)
add_definitions(-DHAVE_PYTHON)
elseif(ENABLE_PYTHON AND NOT PYTHON_LIBRARIES)
message("Could not find Python. Disabling Python support.")
endif()
if(ENABLE_FRIBIDI AND FRIBIDI_LIBRARIES)
add_definitions(-DHAVE_FRIBIDI)
elseif(ENABLE_FRIBIDI AND NOT FRIBIDI_LIBRARIES)

View File

@ -70,11 +70,6 @@ then
3. Scons build
You will need the following tools:
python >=2.4
scons
SCons can be found at http://www.scons.org/ .
Simply type 'scons' in the top-level directory to build the game and
@ -190,7 +185,6 @@ Optional Features:
--enable-profile build=profile
--enable-tests (See --help documentation on choice of targets)
--enable-static static=yes
--disable-python python=no
--enable-python-install N/A
--enable-lite N/A
--enable-tinygui gui=tiny

View File

@ -185,17 +185,6 @@ ACLOCAL_AMFLAGS = -I m4
EXTRA_DIST = config/config.rpath config/mkinstalldirs config/py-compile
if PYTHON_INSTALL
pkgpython_PYTHON = data/tools/wesnoth/wmltools.py \
data/tools/wesnoth/libsvn.py \
data/tools/wesnoth/wescamp.py \
data/tools/wesnoth/wmldata.py \
data/tools/wesnoth/wmliterator.py \
data/tools/wesnoth/wmlparser.py \
data/tools/wesnoth/campaignserver_client.py \
data/tools/wesnoth/__init__.py
endif
update-po:
@cd po && make $@ || exit $?
if MANUALUPDATE

View File

@ -73,7 +73,6 @@ opts.AddOptions(
BoolOption('prereqs','abort if prerequisites cannot be detected',True),
('program_suffix', 'suffix to append to names of installed programs',"$version_suffix"),
('version_suffix', 'suffix that will be added to default values of prefsdir, program_suffix and datadirname', ""),
BoolOption('python', 'Enable in-game python extensions.', True),
BoolOption('raw_sockets', 'Set to use raw receiving sockets in the multiplayer network layer rather than the SDL_net facilities', False),
BoolOption('forum_user_handler', 'Enable forum user handler in wesnothd', False),
BoolOption('pool_alloc', 'Enable custom pool malloc', False),
@ -252,9 +251,6 @@ if env["prereqs"]:
env.ParseConfig("mysql_config --libs --cflags")
env.Append(CPPDEFINES = ["HAVE_MYSQLPP"])
if env["python"]:
env["python"] = (float(sys.version[:3]) >= 2.4) and conf.CheckPython() or Warning("Python >= 2.4 not found. Python extensions will be disabled.")
env = conf.Finish()
test_env = env.Clone()

View File

@ -129,16 +129,6 @@ AC_ARG_ENABLE([static],
[static=$enableval],
[static=no])
AC_ARG_ENABLE([python],
AS_HELP_STRING([--disable-python], [disable Python support]),
[python=$enableval],
[python=yes])
AC_ARG_ENABLE([python_install],
AS_HELP_STRING([--enable-python-install], [enable installation of Python developer tools]),
[python_install=$enableval],
[python_install=no])
AC_ARG_ENABLE([tinygui],
AS_HELP_STRING([--enable-tinygui], [enable GUI reductions for resolutions down to 320x240 (PDAs), resize images before installing]),
[tinygui=$enableval],
@ -283,18 +273,6 @@ AC_ARG_ENABLE([internal-data],
[internaldata=$enableval],
[internaldata=no])
if test "x$game" = "xno"
then
python=no
AC_MSG_WARN([*** Game build disabled, suppressing Python support.])
fi
if test "x$python" = "xno"
then
python_install=no
AC_MSG_WARN([*** Python support disabled, suppressing installation of Python tools.])
fi
AC_ARG_ENABLE([raw-sockets],
AS_HELP_STRING([--enable-raw-sockets], [use raw receiving sockets in the multiplayer network layer rather than the SDL_net facilities]),
[raw_sockets=$enableval],
@ -311,7 +289,6 @@ then
fi
AM_CONDITIONAL([STATIC], [test x$static = xyes])
AM_CONDITIONAL([PYTHON_INSTALL], [test x$python_install = xyes])
AM_CONDITIONAL([GAME], [test x$game = xyes])
AM_CONDITIONAL([SERVER], [test x$server = xyes])
AM_CONDITIONAL([CAMPAIGNSERVER], [test x$campaignserver = xyes])
@ -424,80 +401,6 @@ AC_SUBST([FRIBIDI_CFLAGS])
AC_SUBST([FRIBIDI_LIBS])
AM_CONDITIONAL([FRIBIDI], [test "x$fribidifound" = xyes -a "x$fribidi" = xyes ])
# python
if test "x$python" = "xyes"; then
pythonfound=yes
if test "x$PYTHON" = "x"; then
AC_PATH_PROG(PYTHON, python, none)
fi
if test "x$PYTHON" = "xnone"; then
AC_MSG_WARN([*** Python interpreter not found, Python support disabled.])
pythonfound=no
fi
if test "x$pythonfound" = "xyes"; then
AC_MSG_CHECKING(Python version and location)
PYTHON_PREFIX=`$PYTHON -c "import sys; print sys.prefix"`
PYTHON_VERSION_MAJOR=[`$PYTHON -c "import sys; print '%d' % (sys.version_info[0]);"`]
PYTHON_VERSION_MINOR=[`$PYTHON -c "import sys; print '%d' % (sys.version_info[1]);"`]
PYTHON_VERSION="${PYTHON_VERSION_MAJOR}.${PYTHON_VERSION_MINOR}"
AC_MSG_RESULT([$PYTHON, $PYTHON_VERSION, $PYTHON_PREFIX])
AC_MSG_CHECKING(whether Python is at least 2.4)
if test $PYTHON_VERSION_MAJOR -lt 2 -o $PYTHON_VERSION_MAJOR -eq 2 -a $PYTHON_VERSION_MINOR -lt 4; then
AC_MSG_RESULT(no)
AC_MSG_WARN([*** Wesnoth requires at least Python 2.4, Python support disabled.])
pythonfound=no
fi
if test "x$pythonfound" = "xyes"; then
AC_MSG_RESULT(yes)
PYTHON_CFLAGS="-DHAVE_PYTHON -I$PYTHON_PREFIX/include/python$PYTHON_VERSION"
OLD_CPPFLAGS="$CPPFLAGS"
OLD_CXXFLAGS="$CXXFLAGS"
CPPFLAGS="$CPPFLAGS $PYTHON_CFLAGS"
CXXFLAGS="$CXXFLAGS $PYTHON_CFLAGS"
AC_CHECK_HEADER([Python.h],
[],
[AC_MSG_WARN([*** Python include files not found! You should install Python development package. Python support disabled]); pythonfound=no])
CPPFLAGS="$OLD_CPPFLAGS"
CXXFLAGS="$OLD_CXXFLAGS"
if test "x$pythonfound" = "xyes"; then
AC_SUBST([PYTHON_CFLAGS])
pythonfound=no
for pylibpath in '/usr/lib' $PYTHON_PREFIX/lib $PYTHON_PREFIX/lib/python$PYTHON_VERSION/config; do
eval `echo unset ac_cv_lib_python$PYTHON_VERSION'___'Py_Finalize | tr '.' '_'`
save_LIBS=$LIBS
LIBS="$LIBS -L$pylibpath"
AC_CHECK_LIB(python$PYTHON_VERSION, Py_Finalize, PYTHON_LIBS="-L$pylibpath -lpython$PYTHON_VERSION $PYTHON_DEPS"; pythonfound=yes,,$PYTHON_DEPS)
LIBS=$save_LIBS
if test "x$pythonfound" = "xyes"; then
break
fi
done
if test "x$pythonfound" != "xyes"; then
AC_MSG_WARN(*** Python development libraries required, Python support disabled)
fi
AC_SUBST([PYTHON_LIBS])
AC_SUBST(pkgpythondir)
if test "x$python_install" = "xyes"; then
pkgpythondir=$PYTHON_PREFIX"/lib/python"$PYTHON_VERSION"/site-packages/wesnoth"
fi
fi
fi
fi
fi
AM_CONDITIONAL([PYTHON], [test "x$pythonfound" = xyes -a "x$python" = xyes ])
# libpng-config
AC_PATH_PROGS([PNG_CONFIG], [libpng-config libpng12-config], [none])

View File

@ -1,29 +0,0 @@
# Copyright Greg Copeland, 2008 - 2009
# Released under GPL license for Wesnoth. See Wesnoth's
# licensing terms for this module's specific license.
# This file should only be called once per python instance. It
# sets up stderr to a per python instance file.
import os
import sys
import traceback
def initEnv( instName ):
"""
Initialize a python AI's environment. Call exactly once per
python instance. Call before an attempt to launch a python
AI is made.
"""
# Create an output filename for each
# python instance.
fn = "pyerr-%s.txt" % (str(instName))
# Override stderr to write to a file
try:
errFile = file( fn, "wb" )
sys.stderr = errFile
except IOError, e:
sys.stderr.write( "Python: Unable to create '%s'; '%s'." % (fn, str(e)) )

View File

@ -1,52 +0,0 @@
#!/usr/bin/env python
# Copyright Greg Copeland, 2008 - 2009
# Released under GPL license for Wesnoth. See Wesnoth's
# licensing terms for this module's specific license.
#
import os
import sys
import traceback
print "launch has been imported!"
def launch( script, restrict, importOnly=False ):
# Launch one of two possible environments
# If restrict arg is True, run inside the available
# restrictied python environment (safe.py). If restrict
# is False, then run without any type of restrictions at
# all. If importOnly is True, only import but don't attempt
# to execute.
try:
if restrict:
print "restricted environment detected - running parse/safe"
import safe
import parse
parse.paths = ""
code, context = parse.parse( script )
if not importOnly:
safe.safe_exec( code, context, restrict )
else:
print "unrestricted environment detected..."
__import__( script )
scrpt = sys.modules[ script ]
# Call our entry points
if not importOnly:
print "Running script in unrestricted environment..."
scrpt.turn()
print "Script has completed execution."
except:
err = str( traceback.format_exc() )
raise
print "launch has completed."

View File

@ -1,548 +0,0 @@
#!WPY
import ai as wesnoth
## Copyright 2006 by Michael Schmahl
## This code is available under the latest version of the GNU Public License.
## See COPYING for details. Some inspiration and code derived from "sample.py"
## by allefant.
##
## This is my attempt at a 'chess-like' AI. All moves are motivated by
## an underlying evaluation function. The actual eval function doesn't
## need to be coded, because moves can be scored and ranked based on the
## incremental change in the evaluation. Unlike a chess-playing program,
## though, this program does no lookahead, because the branching factor
## is prohibitively high (potentially in the thousands), and because then
## the script would have to create an internal model of the game state.
##
## Despite the lack of any lookahead, I still consider this AI to be
## chess-like because it evaluates every possible move and attack, even
## those that are obviously (to a human) bad. How can a computer know
## that these are bad moves unless it actually checks?
##
## The evaluation function is:
##
## (1) side_score = village_score
## + sum(unit_score, over all units)
## + positional_score
##
## The value of a unit can be highly subjective, but to simplify, assume
## that any level-1 unit is just as valuable as any other level-1 unit.
## Specifically, the value of a unit will be:
##
## (2) unit_score = (1 + level + %xp)(1 + %hp)
##
## Leaders are be considered three levels higher than their actual level.
## So a freshly-recruited level-1 unit is worth 4.0 points. And a level-2
## unit with half its hitpoints remaining, but halfway to level 3, is
## worth 6.75 points.
##
## One question is: How much is a village worth, compared to a (typical)
## unit? A typical unit is worth 15 to 20 gold, because that is how much
## we paid for it. A village is worth two or three gold *per turn* as
## long as it is held. (The village is worth three gold when it offsets
## a unit's upkeep.) So we must make some assumptions as to the value of
## a present gold piece, compared to a future gold piece. Assume a decay
## rate of 1.5 (i.e. a gold piece one turn from now is worth two-thirds
## of a gold piece now). This makes the present value of a village equal
## to twice its income. If we set the value of a typical unit at 16 gold,
## we get that an upkeep-offsetting village is worth 1.5 points, and a
## supernumerary village is worth 1.0 points. For simplicity, the value
## of each village is set at 1.0.
##
## (3) village_score = number of villages
##
## The positional score is the most interesting term of equation (1),
## because that, more than anything else, will guide the AI's behavior.
##
## First, we want the AI to expand to capture villages. So, for each unit,
## it is scored based on how far it is from the nearest unowned or enemy
## village. If the distance is zero, the unit has actually captured the
## village, so in that limit, the value should be equal to the village
## value. As the distance approaces infinity, the score should tend
## toward zero. This suggests something like:
##
## (4) village_proximity = c / (c + distance)
##
## I have selected c to be equal to equal to the unit's movement. This
## means that (approximately) a unit one turn away from capturing a village
## gets 0.5 points; two turns, 0.33 points, etc. Although an exponential
## relationship would be more accurate, exponentiation is expensive, and
## better avoided, since thousands of moves are evaluated per turn.
##
## Second, we want units to stand on defensive terrain when within range
## of the enemy. The 'right' way to do this would be to count up all the
## potential attackers at the destination square, see how much damage they
## might do, and score the move based on how much damage would be dealt/
## prevented. Again, this is much too slow. I have found a reasonable
## approximation is:
##
## (5) exposure_penalty = -defense_modifier / 10
##
## Maybe much too simple, but easy to calculate! In future editions, perhaps
## I should take into account how damaged the unit is, or at least make some
## attempt to count the attackers.
##
## Third, we want units to heal when damaged or poisoned. Referring to
## equation (2), we can see that the value of healing is:
##
## (6) healing_score = healing / max_hitpoints * (1 + level + %xp)
##
## We consider poison, which does 8 damage *per turn*, to be equivalent to
## 16 points of actual damage, for the same reason a village's real value is
## twice its income (see above).
##
## Fourth, we want units to guard villages if the enemy is in range to take
## them. If, by stationing a unit on a village, we prevent the enemy from
## taking it, we have prevented a 2-point swing in the enemy's favor. Again
## considering a decay rate of 2/3 per turn, this means the garrison value
## is 4/3. But since there is no guarantee that our garrison will be
## successful (perhaps the enemy will take the village anyway; perhaps it is
## not possible to garrison all threatened villages), we will cut this in half.
##
## (7) garrison_score = 2/3
##
## Fifth, we want our leader to stay near a keep. Otherwise, any gold we
## might have accumulated will be wasted. And finally, we want units to move
## toward the enemy leader. These are accomplished by treating keeps as
## if they were unowned villages (for our leader), and the enemy leader
## as if it were a village (for everyone else).
##
## This should be all that is required to play a decent game of Wesnoth.
## This AI scores quite well against the Wesnoth default AI, which may be
## surprising, because it uses no sophisticated tools. There is no attempt
## to use any of the path-finding tools made available by the API (which
## would be too slow to be used thousands of times every turn). There is
## no attempt to use combination attacks (meaning, that even though none of
## several units can favorably attack a certain target, if they all attack
## in the same turn, the result is likely to be favorable). No attempt is
## made to assign units individually to targets.
##
## Some bad behaviors may result from these shortcomings:
##
## If the map is maze-like, or simply has a few corners surrounded by
## impassable terrain, units may get stuck. On Cynsaun Battlefield, for
## example, a group of units got stuck in the middle of the river, trying
## to capture a village on the other side of the deep-water hexes.
##
## An enemy unit may get completely surrounded by friendly units, who are
## weak in comparison to the enemy, and our AI will make no attempt to kill
## the enemy unit. (Think six Wolf Riders surrounding an Orcish Grunt.)
## Usually one or more of these units will find something else to do, allowing
## a few Archers to take their place and start to wear down the Grunt. Or
## the Grunt will attack, getting damaged in the process, and creating a
## chance-to-kill for one of the Wolves.
##
## If there is an unoccupied village in a corner of the map, our AI will
## send every unit that is closer to the village than any other, to that
## village. Often, only one unit is necessary. Thus, harassing villages
## with scouts may be a much more viable strategy against this AI than
## against human players, or against the default AI.
##
## For those interested in results, I have set up a tournament between my
## AI and the default AI. The tournament consists of one match on each of
## the mainline two-player maps (except Wesbowl, naturally). In each map,
## each opponent is allowed to be player 1 once. If there is no decision
## after two games, two more games are played, repeating as necessary until
## one opponent has won the match. All games are played with a 50-turn
## limit, 2 gold per village, 70% experience, and no fog. (I think there
## is a bug (feature?) that AIs ignore fog, so I disabled it to improve the
## observer's (my) experience.) Factions are chosen randomly.
##
## Map W-L-D %Win Match result
## Blitz 2-0-0 100 Win
## Caves of the Basilisk 4-2-0 67 Win
## Charge 3-1-0 75 Win
## Cynsaun Battlefield (1gpv) 2-0-0 100 Win
## Den of Onis 4-2-0 67 Win
## Hamlets 2-0-0 100 Win
## Hornshark Island 0-2-0 0 Loss
## Meteor Lake 2-0-0 100 Win
## Sablestone Delta 2-0-0 100 Win
## Silverhead Crossing 3-1-0 75 Win
## Sulla's Ruins 2-0-0 100 Win
## ** Overall 25-8-0 76 10 Wins, 1 Loss (91%)
# UNIT SCORE MODIFIERS
BASE_UNIT_SCORE = 1 # Base worth of a unit
LEVEL_SCORE = 1 # Worth/level
LEADER_SCORE = 3 # Leader worth
FULL_XP_SCORE = 1 # How much is partial XP worth (1 is 100% XP = 1 pt)
# This score is then multiplied by a factor dependant on the price of the unit
# this makes expensive units worth more to the AI
COST_SCORE = 0 #
BASE_COST_SCORE = 1 #
# Formula:
# Base_Score = BASE_UNIT_SCORE + level * LEVEL_SCORE + is_leader * LEADER_SCORE + xp/max_xp * FULL_XP_SCORE
# Cost_Modifier = BASE_COST_SCORE + price * COST_SCORE
# Unit_Score(unit_k) = Base_Score * Cost_Modifier
# POSITION SCORE MODIFIERS
NO_MOVE_PEN = 0 # Penalty for not moving (doesn't quite work)
NEXT_TO_ENEMY_PEN = 0 # Penalty for moving next to an enemy and not attacking
STAND_NEXT_TO_ENEMY_PEN = 0 # Penalty for standing next to an enemy without moving or attacking
# MISC SCORE MODIFIERS
LEVEL_CHANCE_BONUS = 0 # How much a level-up is worth
VILLAGE_SCORE = 1 # How much capturing a village is worth
ENEMY_VILLAGE_BONUS = 1 # How much extra is an enemy village worth
GARRISON_SCORE = 2.0/3 # How much defending a village is worth
DEFENSE_FACTOR = 1.0/1000 # How much to penalize a unit for being in an attackable position
HEAL_FACTOR = 1 # How much is healing worth
HEAL_ATTACKABLE = .5 # How much relative to healing is healing when attackable worth
HEAL_POISON = 16 # How much is healing from poison worth
HP_SCALE = .1 # Unit HP/turn (for recruitment)
def pos(p):
if p==None: return ("Nowhere")
return ("(%s,%s)"%(p.x+1,p.y+1))
class AI:
def __init__(self):
self.get_villages()
self.get_keeps()
self.mapsize = max((wesnoth.get_map().x,wesnoth.get_map().y)) / 30.0
self.stats = [0,0]
def report_stats(self):
wesnoth.log_message("%d moves, %d fights evaluated" % (self.stats[0],self.stats[1]))
def get_villages(self):
self.notmyvillages = []
m = wesnoth.get_map()
for x in range(m.x):
for y in range(m.y):
loc = wesnoth.get_location(x,y)
if m.is_village(loc):
for team in wesnoth.get_teams():
if team.owns_village(loc) and not team.is_enemy:
break
else:
self.notmyvillages.append(loc)
def get_keeps(self):
self.keeps = []
m = wesnoth.get_map()
for x in range(m.x):
for y in range(m.y):
loc = wesnoth.get_location(x,y)
if m.is_keep(loc):
# If the enemy is occupying the keep, it is "off-limits" to our leader.
# Otherwise, if our leader has strayed too far, it might attempt to go
# to the enemy keep, which basically means we lose.
if loc not in wesnoth.get_enemy_destinations_by_unit().keys():
self.keeps.append(loc)
def recruit(self):
# I haven't discussed this at all. Perhaps a few paragraphs would be in order.
if wesnoth.get_current_team().gold < 16: return
# find our leader
leaderpos = None
for location,unit in wesnoth.get_units().iteritems():
if unit.can_recruit and unit.side == wesnoth.get_current_team().side:
leaderpos = location
break
# no leader? can't recruit
if leaderpos == None: return
# is our leader on a keep? If not, move to a keep
# Maybe should always go to nearest keep
if not leaderpos in self.keeps:
for dest in wesnoth.get_destinations_by_unit().get(leaderpos,[]):
if dest in self.keeps:
leaderpos = wesnoth.move_unit(leaderpos,dest)
break
# is our leader on a keep now? If not, can't recruit
if leaderpos not in self.keeps: return
# build up a list of recruits and scores for each
recruit_list = []
sumweights = 0
for recruit in wesnoth.get_current_team().recruits():
weight = self.recruit_score(recruit)
if weight < 0.01: weight = 0.01
recruit_list.append((recruit.name,weight))
sumweights += weight
# repeatedly recruit until we fail
while 1:
# pick a random recruit in proportion to the weights
r = wesnoth.get_random(0,sumweights)
for recruit,weight in recruit_list:
r -= weight
if r < 0: break
# just use leaderpos for the location; wesnoth will always
# recruit on the nearest adjacent tile
if not wesnoth.recruit_unit(recruit,leaderpos): break
def map_score(self,recruit):
# calculate average speed in hexes/turn
# and average defense in effective hp
m = wesnoth.get_map()
n = m.x * m.y
speed = 0.0
defense = 0.0
for x in range(m.x):
for y in range(m.y):
loc = wesnoth.get_location(x,y)
speed += 1.0 / recruit.movement_cost(loc)
rdm = recruit.defense_modifier(loc) - 1
if rdm:
defense += 100.0 / rdm
else:
defense += 1.00
# speed is more important on larger maps
speed *= self.mapsize * recruit.movement / n
# scaled down because effective hp is over the lifetime of the unit,
# while other scores are based on per-turn quantities
defense *= HP_SCALE * recruit.hitpoints / n
return speed,defense
def combat_score(self,recruit):
# combat advantage, in hp/turn, averaged over all enemy units
tot = 0.0
n = 0
for loc,enem in wesnoth.get_units().iteritems():
if not enem.is_enemy: continue
n += 1
tot += self.combat_advantage(recruit,enem)
tot -= self.combat_advantage(enem,recruit)
return tot/n
def combat_advantage(self,attacker,defender):
# combat advantage for attacker attacking defender
best = 0.0
for weapon in attacker.attacks():
damage = weapon.damage * weapon.num_attacks * defender.damage_from(weapon) / 100.0
best_retal = 0.0
for retaliation in defender.attacks():
if weapon.range == retaliation.range:
retal = retaliation.damage * retaliation.num_attacks * attacker.damage_from(retaliation) / 100.0
if retal > best_retal: best_retal = retal
damage -= best_retal
if damage > best: best = damage
# scale down because not every attack hits
return best/2
def recruit_score(self,recruit):
speed,defense = self.map_score(recruit)
combat = self.combat_score(recruit)
rval = (speed + defense + combat)/recruit.cost
# only report "interesting" results
if rval > 0:
wesnoth.log_message("%s: (%.2f + %.2f + %.2f) / %d = %.3f" % (recruit.name,speed,defense,combat,recruit.cost,rval))
return rval
def do_one_move(self):
enemlocs = wesnoth.get_enemy_destinations_by_unit().keys()
self.enemdests = wesnoth.get_enemy_units_by_destination().keys()
bestmove = (0,None,None,None) # score,orig,dest,target
# find the best move
for orig in wesnoth.get_destinations_by_unit().keys():
# get a baseline score for this unit "standing pat"
base_score = self.eval_move(orig,orig)
for dest in wesnoth.get_destinations_by_unit()[orig]:
# Bug workaround -- if we have recruited this turn,
# get_destinations_by_unit() is incorrect
if dest in wesnoth.get_units().keys() and dest != orig: continue
score = self.eval_move(orig,dest) - base_score
if score > bestmove[0]:
bestmove = (score,orig,dest,dest)
for target in wesnoth.get_adjacent_tiles(dest):
if target in enemlocs:
fight = self.eval_fight(wesnoth.get_units()[orig],dest,target)+score
if orig == dest:
fight += STAND_NEXT_TO_ENEMY_PEN + NO_MOVE_PEN
else:
fight += NEXT_TO_ENEMY_PEN
if fight > bestmove[0]:
bestmove = (fight,orig,dest,target)
if bestmove[1] == None:
# no move improved the position, therefore we are done
return False
score,orig,dest,target = bestmove
wesnoth.log_message("%.3f: %s->%s@%s"%(score,pos(orig),pos(dest),pos(target)))
if dest != orig: wesnoth.move_unit(orig,dest)
if dest in self.notmyvillages: self.notmyvillages.remove(dest)
if target != dest: wesnoth.attack_unit(dest,target)
return True
def eval_fight(self,unit,dest,target):
self.stats[1] += 1
enem = wesnoth.get_units().get(target,None)
if not enem: return 0
# the base value for each unit:
# I should give more weight to defeating a garrison
unit_k = (LEVEL_SCORE*unit.type().level + BASE_UNIT_SCORE + LEADER_SCORE*unit.can_recruit\
+ FULL_XP_SCORE * unit.experience * 1.0 / unit.max_experience) * (BASE_COST_SCORE + unit.type().cost * COST_SCORE)
enem_k = (LEVEL_SCORE*enem.type().level + BASE_UNIT_SCORE + LEADER_SCORE*enem.can_recruit\
+ FULL_XP_SCORE * enem.experience * 1.0 / enem.max_experience) * (BASE_COST_SCORE + enem.type().cost * COST_SCORE)
unit_hp,enem_hp = unit.attack_statistics(dest,target)
score = 0.0
for hp,p in enem_hp.iteritems():
score += p * (enem.hitpoints - hp) * enem_k / enem.max_hitpoints
if hp<=0: score += p * enem_k
for hp,p in unit_hp.iteritems():
score -= p * (unit.hitpoints - hp) * unit_k / unit.max_hitpoints
if hp<=0: score -= p * unit_k
enem_xp = 8*enem.type().level
if enem.type().level == 0:
enem_xp = 4
unit_xp = 8*unit.type().level
if unit.type().level == 0:
unit_xp = 4
if enem.type().level >= unit.max_experience - unit.experience:
for hp, p in unit_hp.iteritems():
if hp > 0: score += LEVEL_CHANCE_BONUS * p * unit_k
elif enem_xp >= unit.max_experience - unit.experience:
for hp, p in enem_hp.iteritems():
if hp <= 0: score += LEVEL_CHANCE_BONUS * p * unit_k
if unit.type().level >= enem.max_experience - enem.experience:
for hp, p in enem_hp.iteritems():
if hp > 0: score -= LEVEL_CHANCE_BONUS * p * enem_k
elif unit_xp >= enem.max_experience - enem.experience:
for hp, p in unit_hp.iteritems():
if hp <= 0: score += LEVEL_CHANCE_BONUS * p * enem_k
return score
def eval_move(self,orig,dest):
enemlocs = wesnoth.get_enemy_destinations_by_unit().keys()
self.stats[0] += 1
score = 0.0
unit = wesnoth.get_units().get(orig,None)
if not unit: return
unit_k = (LEVEL_SCORE*unit.type().level + BASE_UNIT_SCORE + LEADER_SCORE*unit.can_recruit\
+ FULL_XP_SCORE * unit.experience * 1.0 / unit.max_experience) * (BASE_COST_SCORE + unit.type().cost * COST_SCORE)
# subtract 1 because terrain might be a factor
speed = unit.type().movement - 1
attackable=False
if dest in self.enemdests:
attackable = True
else:
for adj in wesnoth.get_adjacent_tiles(dest):
if adj in self.enemdests:
attackable = True
break
# capture villages
if dest in self.notmyvillages:
score += VILLAGE_SCORE
for team in wesnoth.get_teams():
if team.owns_village(dest) and team.is_enemy:
score += ENEMY_VILLAGE_BONUS
bestdist=100
if unit.can_recruit:
# leader stays near keep
for keep in self.keeps:
dist=dest.distance_to(keep)
if dist<bestdist:
bestdist=dist
if dist<=1: break
else:
# everyone else moves toward enemy leader
for loc,enem in wesnoth.get_units().iteritems():
if enem.is_enemy and enem.can_recruit:
dist=dest.distance_to(loc)
if dist<bestdist:
bestdist=dist
if dist<=1: break
if bestdist > 1:
for vil in self.notmyvillages:
if dest==vil: continue
dist=dest.distance_to(vil)
if dist<bestdist:
bestdist=dist
if dist<=1: break
score += (1.0 * speed) / (bestdist + speed)
# healing
# I am ignoring the value of healers, and regenerating units. I don't think unit abilities
# are correctly reported by the API, anyway.
if (unit.poisoned or unit.hitpoints<unit.max_hitpoints) and wesnoth.get_map().is_village(dest):
if unit.poisoned: healing = HEAL_POISON
else:
healing = unit.max_hitpoints-unit.hitpoints
if healing > 8: healing = 8
# reduce the healing bonus if we might get killed first
if attackable: healing *= HEAL_ATTACKABLE
score += HEAL_FACTOR * healing * unit_k / unit.max_hitpoints
if attackable:
# defense
score -= unit.defense_modifier(dest) * DEFENSE_FACTOR
# garrison
if wesnoth.get_map().is_village(dest): score += GARRISON_SCORE
# reduce chances of standing next to a unit without attacking for a whole turn
if dest == orig:
score -= NO_MOVE_PEN
for target in wesnoth.get_adjacent_tiles(dest):
if target in enemlocs:
score -= STAND_NEXT_TO_ENEMY_PEN
break
else:
for target in wesnoth.get_adjacent_tiles(dest):
if target in enemlocs:
score -= NEXT_TO_ENEMY_PEN
break
# end mod
return score
#import time
#st = time.time()
#import sys
print "Running bruteforce ai."
print "Wesnoth", wesnoth.get_version()
#print "Python", sys.version
ai = AI()
ai.recruit()
while True:
if not ai.do_one_move():
break
ai.recruit()
ai.report_stats()
#print "======================="
#print "bruteforce ran for %0.4f-seconds." % (time.time() - st)
#print "======================="

View File

@ -1,608 +0,0 @@
#!UNSAFE_WPY
## Copyright 2006 by Michael Schmahl
## This code is available under the latest version of the GNU Public License.
## See COPYING for details. Some inspiration and code derived from "sample.py"
## by allefant.
##
## This is my attempt at a 'chess-like' AI. All moves are motivated by
## an underlying evaluation function. The actual eval function doesn't
## need to be coded, because moves can be scored and ranked based on the
## incremental change in the evaluation. Unlike a chess-playing program,
## though, this program does no lookahead, because the branching factor
## is prohibitively high (potentially in the thousands), and because then
## the script would have to create an internal model of the game state.
##
## Despite the lack of any lookahead, I still consider this AI to be
## chess-like because it evaluates every possible move and attack, even
## those that are obviously (to a human) bad. How can a computer know
## that these are bad moves unless it actually checks?
##
## The evaluation function is:
##
## (1) side_score = village_score
## + sum(unit_score, over all units)
## + positional_score
##
## The value of a unit can be highly subjective, but to simplify, assume
## that any level-1 unit is just as valuable as any other level-1 unit.
## Specifically, the value of a unit will be:
##
## (2) unit_score = (1 + level + %xp)(1 + %hp)
##
## Leaders are be considered three levels higher than their actual level.
## So a freshly-recruited level-1 unit is worth 4.0 points. And a level-2
## unit with half its hitpoints remaining, but halfway to level 3, is
## worth 6.75 points.
##
## One question is: How much is a village worth, compared to a (typical)
## unit? A typical unit is worth 15 to 20 gold, because that is how much
## we paid for it. A village is worth two or three gold *per turn* as
## long as it is held. (The village is worth three gold when it offsets
## a unit's upkeep.) So we must make some assumptions as to the value of
## a present gold piece, compared to a future gold piece. Assume a decay
## rate of 1.5 (i.e. a gold piece one turn from now is worth two-thirds
## of a gold piece now). This makes the present value of a village equal
## to twice its income. If we set the value of a typical unit at 16 gold,
## we get that an upkeep-offsetting village is worth 1.5 points, and a
## supernumerary village is worth 1.0 points. For simplicity, the value
## of each village is set at 1.0.
##
## (3) village_score = number of villages
##
## The positional score is the most interesting term of equation (1),
## because that, more than anything else, will guide the AI's behavior.
##
## First, we want the AI to expand to capture villages. So, for each unit,
## it is scored based on how far it is from the nearest unowned or enemy
## village. If the distance is zero, the unit has actually captured the
## village, so in that limit, the value should be equal to the village
## value. As the distance approaces infinity, the score should tend
## toward zero. This suggests something like:
##
## (4) village_proximity = c / (c + distance)
##
## I have selected c to be equal to equal to the unit's movement. This
## means that (approximately) a unit one turn away from capturing a village
## gets 0.5 points; two turns, 0.33 points, etc. Although an exponential
## relationship would be more accurate, exponentiation is expensive, and
## better avoided, since thousands of moves are evaluated per turn.
##
## Second, we want units to stand on defensive terrain when within range
## of the enemy. The 'right' way to do this would be to count up all the
## potential attackers at the destination square, see how much damage they
## might do, and score the move based on how much damage would be dealt/
## prevented. Again, this is much too slow. I have found a reasonable
## approximation is:
##
## (5) exposure_penalty = -defense_modifier / 10
##
## Maybe much too simple, but easy to calculate! In future editions, perhaps
## I should take into account how damaged the unit is, or at least make some
## attempt to count the attackers.
##
## Third, we want units to heal when damaged or poisoned. Referring to
## equation (2), we can see that the value of healing is:
##
## (6) healing_score = healing / max_hitpoints * (1 + level + %xp)
##
## We consider poison, which does 8 damage *per turn*, to be equivalent to
## 16 points of actual damage, for the same reason a village's real value is
## twice its income (see above).
##
## Fourth, we want units to guard villages if the enemy is in range to take
## them. If, by stationing a unit on a village, we prevent the enemy from
## taking it, we have prevented a 2-point swing in the enemy's favor. Again
## considering a decay rate of 2/3 per turn, this means the garrison value
## is 4/3. But since there is no guarantee that our garrison will be
## successful (perhaps the enemy will take the village anyway; perhaps it is
## not possible to garrison all threatened villages), we will cut this in half.
##
## (7) garrison_score = 2/3
##
## Fifth, we want our leader to stay near a keep. Otherwise, any gold we
## might have accumulated will be wasted. And finally, we want units to move
## toward the enemy leader. These are accomplished by treating keeps as
## if they were unowned villages (for our leader), and the enemy leader
## as if it were a village (for everyone else).
##
## This should be all that is required to play a decent game of Wesnoth.
## This AI scores quite well against the Wesnoth default AI, which may be
## surprising, because it uses no sophisticated tools. There is no attempt
## to use any of the path-finding tools made available by the API (which
## would be too slow to be used thousands of times every turn). There is
## no attempt to use combination attacks (meaning, that even though none of
## several units can favorably attack a certain target, if they all attack
## in the same turn, the result is likely to be favorable). No attempt is
## made to assign units individually to targets.
##
## Some bad behaviors may result from these shortcomings:
##
## If the map is maze-like, or simply has a few corners surrounded by
## impassable terrain, units may get stuck. On Cynsaun Battlefield, for
## example, a group of units got stuck in the middle of the river, trying
## to capture a village on the other side of the deep-water hexes.
##
## An enemy unit may get completely surrounded by friendly units, who are
## weak in comparison to the enemy, and our AI will make no attempt to kill
## the enemy unit. (Think six Wolf Riders surrounding an Orcish Grunt.)
## Usually one or more of these units will find something else to do, allowing
## a few Archers to take their place and start to wear down the Grunt. Or
## the Grunt will attack, getting damaged in the process, and creating a
## chance-to-kill for one of the Wolves.
##
## If there is an unoccupied village in a corner of the map, our AI will
## send every unit that is closer to the village than any other, to that
## village. Often, only one unit is necessary. Thus, harassing villages
## with scouts may be a much more viable strategy against this AI than
## against human players, or against the default AI.
##
## For those interested in results, I have set up a tournament between my
## AI and the default AI. The tournament consists of one match on each of
## the mainline two-player maps (except Wesbowl, naturally). In each map,
## each opponent is allowed to be player 1 once. If there is no decision
## after two games, two more games are played, repeating as necessary until
## one opponent has won the match. All games are played with a 50-turn
## limit, 2 gold per village, 70% experience, and no fog. (I think there
## is a bug (feature?) that AIs ignore fog, so I disabled it to improve the
## observer's (my) experience.) Factions are chosen randomly.
##
## Map W-L-D %Win Match result
## Blitz 2-0-0 100 Win
## Caves of the Basilisk 4-2-0 67 Win
## Charge 3-1-0 75 Win
## Cynsaun Battlefield (1gpv) 2-0-0 100 Win
## Den of Onis 4-2-0 67 Win
## Hamlets 2-0-0 100 Win
## Hornshark Island 0-2-0 0 Loss
## Meteor Lake 2-0-0 100 Win
## Sablestone Delta 2-0-0 100 Win
## Silverhead Crossing 3-1-0 75 Win
## Sulla's Ruins 2-0-0 100 Win
## ** Overall 25-8-0 76 10 Wins, 1 Loss (91%)
import time
import ai as wesnoth
assert not restricted, "Can only be run outside of restricted environment!"
try:
import psyco
psyco.full()
except ImportError:
print "Psyco is not available - no point in continuing..."
print "Aborting AI execution."
raise
class memoized( object ):
"""
Decorator that caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned, and
not re-evaluated.
"""
def __init__( self, func ):
super( memoized, self ).__init__()
self.__func = func
self.__cache = {}
self.__calls = 0L
self.__cacheHits = 0L
def __repr__( self ):
"""Return the function's docstring."""
return self.__func.__doc__
def __call__( self, *args, **kw ):
self.__calls += 1L
funcKey = ( args, tuple( kw.iteritems() ) )
try:
result = self.__cache[ funcKey ]
self.__cacheHits += 1L
return result
except KeyError:
self.__cache[ funcKey ] = retValue = self.__func( *args, **kw )
return retValue
except TypeError:
# uncachable -- for instance, passing a list as an argument.
# Better to not cache than to blow up entirely.
return self.__func( *args, **kw )
def reset( self ):
"""
Reset the cache. Use when results of underlying function call may
have changed.
"""
self.__cache.clear()
def stats( self ):
return float(self.__cacheHits) / self.__calls
# If AI is playing on a map where can pysically change during the AI's turn,
# this redefinition must be commented out.
wesnoth.get_location = memoized( wesnoth.get_location )
# UNIT SCORE MODIFIERS
BASE_UNIT_SCORE = 1 # Base worth of a unit
LEVEL_SCORE = 1 # Worth/level
LEADER_SCORE = 3 # Leader worth
FULL_XP_SCORE = 1 # How much is partial XP worth (1 is 100% XP = 1 pt)
# This score is then multiplied by a factor dependant on the price of the unit
# this makes expensive units worth more to the AI
COST_SCORE = 0 #
BASE_COST_SCORE = 1 #
# Formula:
# Base_Score = BASE_UNIT_SCORE + level * LEVEL_SCORE + is_leader * LEADER_SCORE + xp/max_xp * FULL_XP_SCORE
# Cost_Modifier = BASE_COST_SCORE + price * COST_SCORE
# Unit_Score(unit_k) = Base_Score * Cost_Modifier
# POSITION SCORE MODIFIERS
NO_MOVE_PEN = 0 # Penalty for not moving (doesn't quite work)
NEXT_TO_ENEMY_PEN = 0 # Penalty for moving next to an enemy and not attacking
STAND_NEXT_TO_ENEMY_PEN = 0 # Penalty for standing next to an enemy without moving or attacking
# MISC SCORE MODIFIERS
LEVEL_CHANCE_BONUS = 0 # How much a level-up is worth
VILLAGE_SCORE = 1 # How much capturing a village is worth
ENEMY_VILLAGE_BONUS = 1 # How much extra is an enemy village worth
GARRISON_SCORE = 2.0/3 # How much defending a village is worth
DEFENSE_FACTOR = 1.0/1000 # How much to penalize a unit for being in an attackable position
HEAL_FACTOR = 1 # How much is healing worth
HEAL_ATTACKABLE = .5 # How much relative to healing is healing when attackable worth
HEAL_POISON = 16 # How much is healing from poison worth
HP_SCALE = .1 # Unit HP/turn (for recruitment)
def pos(p):
if p==None: return ("Nowhere")
return ("(%s,%s)"%(p.x+1,p.y+1))
class AI:
def __init__(self):
self.get_villages()
self.get_keeps()
self.mapsize = max((wesnoth.get_map().x,wesnoth.get_map().y)) / 30.0
self.stats = [0,0]
def report_stats(self):
wesnoth.log_message("%d moves, %d fights evaluated" % (self.stats[0],self.stats[1]))
def get_villages(self):
self.notmyvillages = []
m = wesnoth.get_map()
for x in range(m.x):
for y in range(m.y):
loc = wesnoth.get_location(x,y)
if m.is_village(loc):
for team in wesnoth.get_teams():
if team.owns_village(loc) and not team.is_enemy:
break
else:
self.notmyvillages.append(loc)
def get_keeps(self):
self.keeps = []
m = wesnoth.get_map()
for x in range(m.x):
for y in range(m.y):
loc = wesnoth.get_location(x,y)
if m.is_keep(loc):
# If the enemy is occupying the keep, it is "off-limits" to our leader.
# Otherwise, if our leader has strayed too far, it might attempt to go
# to the enemy keep, which basically means we lose.
if loc not in wesnoth.get_enemy_destinations_by_unit().keys():
self.keeps.append(loc)
def recruit(self):
# I haven't discussed this at all. Perhaps a few paragraphs would be in order.
if wesnoth.get_current_team().gold < 16: return
# find our leader
leaderpos = None
for location,unit in wesnoth.get_units().iteritems():
if unit.can_recruit and unit.side == wesnoth.get_current_team().side:
leaderpos = location
break
# no leader? can't recruit
if leaderpos == None: return
# is our leader on a keep? If not, move to a keep
# Maybe should always go to nearest keep
if not leaderpos in self.keeps:
for dest in wesnoth.get_destinations_by_unit().get(leaderpos,[]):
if dest in self.keeps:
leaderpos = wesnoth.move_unit(leaderpos,dest)
break
# is our leader on a keep now? If not, can't recruit
if leaderpos not in self.keeps: return
# build up a list of recruits and scores for each
recruit_list = []
sumweights = 0
for recruit in wesnoth.get_current_team().recruits():
weight = self.recruit_score(recruit)
if weight < 0.01: weight = 0.01
recruit_list.append((recruit.name,weight))
sumweights += weight
# repeatedly recruit until we fail
while 1:
# pick a random recruit in proportion to the weights
r = wesnoth.get_random(0,sumweights)
for recruit,weight in recruit_list:
r -= weight
if r < 0: break
# just use leaderpos for the location; wesnoth will always
# recruit on the nearest adjacent tile
if not wesnoth.recruit_unit(recruit,leaderpos): break
def map_score(self,recruit):
# calculate average speed in hexes/turn
# and average defense in effective hp
m = wesnoth.get_map()
n = m.x * m.y
speed = 0.0
defense = 0.0
for x in range(m.x):
for y in range(m.y):
loc = wesnoth.get_location(x,y)
speed += 1.0 / recruit.movement_cost(loc)
rdm = recruit.defense_modifier(loc) - 1
if rdm:
defense += 100.0 / rdm
else:
defense += 1.00
# speed is more important on larger maps
speed *= self.mapsize * recruit.movement / n
# scaled down because effective hp is over the lifetime of the unit,
# while other scores are based on per-turn quantities
defense *= HP_SCALE * recruit.hitpoints / n
return speed,defense
def combat_score(self,recruit):
# combat advantage, in hp/turn, averaged over all enemy units
tot = 0.0
n = 0
for loc,enem in wesnoth.get_units().iteritems():
if not enem.is_enemy: continue
n += 1
tot += self.combat_advantage(recruit,enem)
tot -= self.combat_advantage(enem,recruit)
return tot/n
def combat_advantage(self,attacker,defender):
# combat advantage for attacker attacking defender
best = 0.0
for weapon in attacker.attacks():
damage = weapon.damage * weapon.num_attacks * defender.damage_from(weapon) / 100.0
best_retal = 0.0
for retaliation in defender.attacks():
if weapon.range == retaliation.range:
retal = retaliation.damage * retaliation.num_attacks * attacker.damage_from(retaliation) / 100.0
if retal > best_retal: best_retal = retal
damage -= best_retal
if damage > best: best = damage
# scale down because not every attack hits
return best/2
def recruit_score(self,recruit):
speed,defense = self.map_score(recruit)
combat = self.combat_score(recruit)
rval = (speed + defense + combat)/recruit.cost
# only report "interesting" results
if rval > 0:
wesnoth.log_message("%s: (%.2f + %.2f + %.2f) / %d = %.3f" % (recruit.name,speed,defense,combat,recruit.cost,rval))
return rval
def do_one_move(self):
enemlocs = wesnoth.get_enemy_destinations_by_unit().keys()
self.enemdests = wesnoth.get_enemy_units_by_destination().keys()
bestmove = (0,None,None,None) # score,orig,dest,target
# find the best move
for orig in wesnoth.get_destinations_by_unit().keys():
# get a baseline score for this unit "standing pat"
base_score = self.eval_move(orig,orig)
for dest in wesnoth.get_destinations_by_unit()[orig]:
# Bug workaround -- if we have recruited this turn,
# get_destinations_by_unit() is incorrect
if dest in wesnoth.get_units().keys() and dest != orig: continue
score = self.eval_move(orig,dest) - base_score
if score > bestmove[0]:
bestmove = (score,orig,dest,dest)
for target in wesnoth.get_adjacent_tiles(dest):
if target in enemlocs:
fight = self.eval_fight(wesnoth.get_units()[orig],dest,target)+score
if orig == dest:
fight += STAND_NEXT_TO_ENEMY_PEN + NO_MOVE_PEN
else:
fight += NEXT_TO_ENEMY_PEN
if fight > bestmove[0]:
bestmove = (fight,orig,dest,target)
if bestmove[1] == None:
# no move improved the position, therefore we are done
return False
score,orig,dest,target = bestmove
wesnoth.log_message("%.3f: %s->%s@%s"%(score,pos(orig),pos(dest),pos(target)))
if dest != orig: wesnoth.move_unit(orig,dest)
if dest in self.notmyvillages: self.notmyvillages.remove(dest)
if target != dest: wesnoth.attack_unit(dest,target)
return True
def eval_fight(self,unit,dest,target):
self.stats[1] += 1
enem = wesnoth.get_units().get(target,None)
if not enem: return 0
# the base value for each unit:
# I should give more weight to defeating a garrison
unit_k = (LEVEL_SCORE*unit.type().level + BASE_UNIT_SCORE + LEADER_SCORE*unit.can_recruit\
+ FULL_XP_SCORE * unit.experience * 1.0 / unit.max_experience) * (BASE_COST_SCORE + unit.type().cost * COST_SCORE)
enem_k = (LEVEL_SCORE*enem.type().level + BASE_UNIT_SCORE + LEADER_SCORE*enem.can_recruit\
+ FULL_XP_SCORE * enem.experience * 1.0 / enem.max_experience) * (BASE_COST_SCORE + enem.type().cost * COST_SCORE)
unit_hp,enem_hp = unit.attack_statistics(dest,target)
score = 0.0
for hp,p in enem_hp.iteritems():
score += p * (enem.hitpoints - hp) * enem_k / enem.max_hitpoints
if hp<=0: score += p * enem_k
for hp,p in unit_hp.iteritems():
score -= p * (unit.hitpoints - hp) * unit_k / unit.max_hitpoints
if hp<=0: score -= p * unit_k
enem_xp = 8*enem.type().level
if enem.type().level == 0:
enem_xp = 4
unit_xp = 8*unit.type().level
if unit.type().level == 0:
unit_xp = 4
if enem.type().level >= unit.max_experience - unit.experience:
for hp, p in unit_hp.iteritems():
if hp > 0: score += LEVEL_CHANCE_BONUS * p * unit_k
elif enem_xp >= unit.max_experience - unit.experience:
for hp, p in enem_hp.iteritems():
if hp <= 0: score += LEVEL_CHANCE_BONUS * p * unit_k
if unit.type().level >= enem.max_experience - enem.experience:
for hp, p in enem_hp.iteritems():
if hp > 0: score -= LEVEL_CHANCE_BONUS * p * enem_k
elif unit_xp >= enem.max_experience - enem.experience:
for hp, p in unit_hp.iteritems():
if hp <= 0: score += LEVEL_CHANCE_BONUS * p * enem_k
return score
def eval_move(self,orig,dest):
enemlocs = wesnoth.get_enemy_destinations_by_unit().keys()
self.stats[0] += 1
score = 0.0
unit = wesnoth.get_units().get(orig,None)
if not unit: return
unit_k = (LEVEL_SCORE*unit.type().level + BASE_UNIT_SCORE + LEADER_SCORE*unit.can_recruit\
+ FULL_XP_SCORE * unit.experience * 1.0 / unit.max_experience) * (BASE_COST_SCORE + unit.type().cost * COST_SCORE)
# subtract 1 because terrain might be a factor
speed = unit.type().movement - 1
attackable=False
if dest in self.enemdests:
attackable = True
else:
for adj in wesnoth.get_adjacent_tiles(dest):
if adj in self.enemdests:
attackable = True
break
# capture villages
if dest in self.notmyvillages:
score += VILLAGE_SCORE
for team in wesnoth.get_teams():
if team.owns_village(dest) and team.is_enemy:
score += ENEMY_VILLAGE_BONUS
bestdist=100
if unit.can_recruit:
# leader stays near keep
for keep in self.keeps:
dist=dest.distance_to(keep)
if dist<bestdist:
bestdist=dist
if dist<=1: break
else:
# everyone else moves toward enemy leader
for loc,enem in wesnoth.get_units().iteritems():
if enem.is_enemy and enem.can_recruit:
dist=dest.distance_to(loc)
if dist<bestdist:
bestdist=dist
if dist<=1: break
if bestdist > 1:
for vil in self.notmyvillages:
if dest==vil: continue
dist=dest.distance_to(vil)
if dist<bestdist:
bestdist=dist
if dist<=1: break
score += (1.0 * speed) / (bestdist + speed)
# healing
# I am ignoring the value of healers, and regenerating units. I don't think unit abilities
# are correctly reported by the API, anyway.
if (unit.poisoned or unit.hitpoints<unit.max_hitpoints) and wesnoth.get_map().is_village(dest):
if unit.poisoned: healing = HEAL_POISON
else:
healing = unit.max_hitpoints-unit.hitpoints
if healing > 8: healing = 8
# reduce the healing bonus if we might get killed first
if attackable: healing *= HEAL_ATTACKABLE
score += HEAL_FACTOR * healing * unit_k / unit.max_hitpoints
if attackable:
# defense
score -= unit.defense_modifier(dest) * DEFENSE_FACTOR
# garrison
if wesnoth.get_map().is_village(dest): score += GARRISON_SCORE
# reduce chances of standing next to a unit without attacking for a whole turn
if dest == orig:
score -= NO_MOVE_PEN
for target in wesnoth.get_adjacent_tiles(dest):
if target in enemlocs:
score -= STAND_NEXT_TO_ENEMY_PEN
break
else:
for target in wesnoth.get_adjacent_tiles(dest):
if target in enemlocs:
score -= NEXT_TO_ENEMY_PEN
break
# end mod
return score
import sys
print "Running bruteforce unsafe ai."
print "Wesnoth", wesnoth.get_version()
print "Python", sys.version
st = time.time()
ai = AI()
ai.recruit()
while True:
if not ai.do_one_move():
break
ai.recruit()
ai.report_stats()
print "======================="
print "cache ratio:", wesnoth.get_location.stats()
print "bruteforce_unsafe ran for %0.4f-seconds." % (time.time() - st)
print "======================="

View File

@ -1,555 +0,0 @@
#!UNSAFE_WPY
# Sample implementation - converting to the wail library rather than
# wesnoth directly. This show how easily bot writers can convert old
# bots, which maintained namespace, to the wail.
# Wail requires Python 2.5+. If the following statement fails, you likely
# are not running Python 2.5.x or later.
from __future__ import with_statement
import time
import wail
import ai
## Copyright 2006 by Michael Schmahl
## This code is available under the latest version of the GNU Public License.
## See COPYING for details. Some inspiration and code derived from "sample.py"
## by allefant.
##
## This is my attempt at a 'chess-like' AI. All moves are motivated by
## an underlying evaluation function. The actual eval function doesn't
## need to be coded, because moves can be scored and ranked based on the
## incremental change in the evaluation. Unlike a chess-playing program,
## though, this program does no lookahead, because the branching factor
## is prohibitively high (potentially in the thousands), and because then
## the script would have to create an internal model of the game state.
##
## Despite the lack of any lookahead, I still consider this AI to be
## chess-like because it evaluates every possible move and attack, even
## those that are obviously (to a human) bad. How can a computer know
## that these are bad moves unless it actually checks?
##
## The evaluation function is:
##
## (1) side_score = village_score
## + sum(unit_score, over all units)
## + positional_score
##
## The value of a unit can be highly subjective, but to simplify, assume
## that any level-1 unit is just as valuable as any other level-1 unit.
## Specifically, the value of a unit will be:
##
## (2) unit_score = (1 + level + %xp)(1 + %hp)
##
## Leaders are be considered three levels higher than their actual level.
## So a freshly-recruited level-1 unit is worth 4.0 points. And a level-2
## unit with half its hitpoints remaining, but halfway to level 3, is
## worth 6.75 points.
##
## One question is: How much is a village worth, compared to a (typical)
## unit? A typical unit is worth 15 to 20 gold, because that is how much
## we paid for it. A village is worth two or three gold *per turn* as
## long as it is held. (The village is worth three gold when it offsets
## a unit's upkeep.) So we must make some assumptions as to the value of
## a present gold piece, compared to a future gold piece. Assume a decay
## rate of 1.5 (i.e. a gold piece one turn from now is worth two-thirds
## of a gold piece now). This makes the present value of a village equal
## to twice its income. If we set the value of a typical unit at 16 gold,
## we get that an upkeep-offsetting village is worth 1.5 points, and a
## supernumerary village is worth 1.0 points. For simplicity, the value
## of each village is set at 1.0.
##
## (3) village_score = number of villages
##
## The positional score is the most interesting term of equation (1),
## because that, more than anything else, will guide the AI's behavior.
##
## First, we want the AI to expand to capture villages. So, for each unit,
## it is scored based on how far it is from the nearest unowned or enemy
## village. If the distance is zero, the unit has actually captured the
## village, so in that limit, the value should be equal to the village
## value. As the distance approaces infinity, the score should tend
## toward zero. This suggests something like:
##
## (4) village_proximity = c / (c + distance)
##
## I have selected c to be equal to equal to the unit's movement. This
## means that (approximately) a unit one turn away from capturing a village
## gets 0.5 points; two turns, 0.33 points, etc. Although an exponential
## relationship would be more accurate, exponentiation is expensive, and
## better avoided, since thousands of moves are evaluated per turn.
##
## Second, we want units to stand on defensive terrain when within range
## of the enemy. The 'right' way to do this would be to count up all the
## potential attackers at the destination square, see how much damage they
## might do, and score the move based on how much damage would be dealt/
## prevented. Again, this is much too slow. I have found a reasonable
## approximation is:
##
## (5) exposure_penalty = -defense_modifier / 10
##
## Maybe much too simple, but easy to calculate! In future editions, perhaps
## I should take into account how damaged the unit is, or at least make some
## attempt to count the attackers.
##
## Third, we want units to heal when damaged or poisoned. Referring to
## equation (2), we can see that the value of healing is:
##
## (6) healing_score = healing / max_hitpoints * (1 + level + %xp)
##
## We consider poison, which does 8 damage *per turn*, to be equivalent to
## 16 points of actual damage, for the same reason a village's real value is
## twice its income (see above).
##
## Fourth, we want units to guard villages if the enemy is in range to take
## them. If, by stationing a unit on a village, we prevent the enemy from
## taking it, we have prevented a 2-point swing in the enemy's favor. Again
## considering a decay rate of 2/3 per turn, this means the garrison value
## is 4/3. But since there is no guarantee that our garrison will be
## successful (perhaps the enemy will take the village anyway; perhaps it is
## not possible to garrison all threatened villages), we will cut this in half.
##
## (7) garrison_score = 2/3
##
## Fifth, we want our leader to stay near a keep. Otherwise, any gold we
## might have accumulated will be wasted. And finally, we want units to move
## toward the enemy leader. These are accomplished by treating keeps as
## if they were unowned villages (for our leader), and the enemy leader
## as if it were a village (for everyone else).
##
## This should be all that is required to play a decent game of Wesnoth.
## This AI scores quite well against the Wesnoth default AI, which may be
## surprising, because it uses no sophisticated tools. There is no attempt
## to use any of the path-finding tools made available by the API (which
## would be too slow to be used thousands of times every turn). There is
## no attempt to use combination attacks (meaning, that even though none of
## several units can favorably attack a certain target, if they all attack
## in the same turn, the result is likely to be favorable). No attempt is
## made to assign units individually to targets.
##
## Some bad behaviors may result from these shortcomings:
##
## If the map is maze-like, or simply has a few corners surrounded by
## impassable terrain, units may get stuck. On Cynsaun Battlefield, for
## example, a group of units got stuck in the middle of the river, trying
## to capture a village on the other side of the deep-water hexes.
##
## An enemy unit may get completely surrounded by friendly units, who are
## weak in comparison to the enemy, and our AI will make no attempt to kill
## the enemy unit. (Think six Wolf Riders surrounding an Orcish Grunt.)
## Usually one or more of these units will find something else to do, allowing
## a few Archers to take their place and start to wear down the Grunt. Or
## the Grunt will attack, getting damaged in the process, and creating a
## chance-to-kill for one of the Wolves.
##
## If there is an unoccupied village in a corner of the map, our AI will
## send every unit that is closer to the village than any other, to that
## village. Often, only one unit is necessary. Thus, harassing villages
## with scouts may be a much more viable strategy against this AI than
## against human players, or against the default AI.
##
## For those interested in results, I have set up a tournament between my
## AI and the default AI. The tournament consists of one match on each of
## the mainline two-player maps (except Wesbowl, naturally). In each map,
## each opponent is allowed to be player 1 once. If there is no decision
## after two games, two more games are played, repeating as necessary until
## one opponent has won the match. All games are played with a 50-turn
## limit, 2 gold per village, 70% experience, and no fog. (I think there
## is a bug (feature?) that AIs ignore fog, so I disabled it to improve the
## observer's (my) experience.) Factions are chosen randomly.
##
## Map W-L-D %Win Match result
## Blitz 2-0-0 100 Win
## Caves of the Basilisk 4-2-0 67 Win
## Charge 3-1-0 75 Win
## Cynsaun Battlefield (1gpv) 2-0-0 100 Win
## Den of Onis 4-2-0 67 Win
## Hamlets 2-0-0 100 Win
## Hornshark Island 0-2-0 0 Loss
## Meteor Lake 2-0-0 100 Win
## Sablestone Delta 2-0-0 100 Win
## Silverhead Crossing 3-1-0 75 Win
## Sulla's Ruins 2-0-0 100 Win
## ** Overall 25-8-0 76 10 Wins, 1 Loss (91%)
# UNIT SCORE MODIFIERS
BASE_UNIT_SCORE = 1 # Base worth of a unit
LEVEL_SCORE = 1 # Worth/level
LEADER_SCORE = 3 # Leader worth
FULL_XP_SCORE = 1 # How much is partial XP worth (1 is 100% XP = 1 pt)
# This score is then multiplied by a factor dependant on the price of the unit
# this makes expensive units worth more to the AI
COST_SCORE = 0 #
BASE_COST_SCORE = 1 #
# Formula:
# Base_Score = BASE_UNIT_SCORE + level * LEVEL_SCORE + is_leader * LEADER_SCORE + xp/max_xp * FULL_XP_SCORE
# Cost_Modifier = BASE_COST_SCORE + price * COST_SCORE
# Unit_Score(unit_k) = Base_Score * Cost_Modifier
# POSITION SCORE MODIFIERS
NO_MOVE_PEN = 0 # Penalty for not moving (doesn't quite work)
NEXT_TO_ENEMY_PEN = 0 # Penalty for moving next to an enemy and not attacking
STAND_NEXT_TO_ENEMY_PEN = 0 # Penalty for standing next to an enemy without moving or attacking
# MISC SCORE MODIFIERS
LEVEL_CHANCE_BONUS = 0 # How much a level-up is worth
VILLAGE_SCORE = 1 # How much capturing a village is worth
ENEMY_VILLAGE_BONUS = 1 # How much extra is an enemy village worth
GARRISON_SCORE = 2.0/3 # How much defending a village is worth
DEFENSE_FACTOR = 1.0/1000 # How much to penalize a unit for being in an attackable position
HEAL_FACTOR = 1 # How much is healing worth
HEAL_ATTACKABLE = .5 # How much relative to healing is healing when attackable worth
HEAL_POISON = 16 # How much is healing from poison worth
HP_SCALE = .1 # Unit HP/turn (for recruitment)
def pos(p):
if p==None: return ("Nowhere")
return ("(%s,%s)"%(p.x+1,p.y+1))
class AI:
def __init__(self):
self.get_villages()
self.get_keeps()
self.mapsize = max((wail.get_map().x,wail.get_map().y)) / 30.0
self.stats = [0,0]
def report_stats(self):
wail.log_message("%d moves, %d fights evaluated" % (self.stats[0],self.stats[1]))
def get_villages(self):
self.notmyvillages = []
m = wail.get_map()
for x in range(m.x):
for y in range(m.y):
loc = wail.get_location(x,y)
if m.is_village(loc):
for team in wail.get_teams():
if team.owns_village(loc) and not team.is_enemy:
break
else:
self.notmyvillages.append(loc)
def get_keeps(self):
self.keeps = []
m = wail.get_map()
for x in range(m.x):
for y in range(m.y):
loc = wail.get_location(x,y)
if m.is_keep(loc):
# If the enemy is occupying the keep, it is "off-limits" to our leader.
# Otherwise, if our leader has strayed too far, it might attempt to go
# to the enemy keep, which basically means we lose.
if loc not in wail.get_enemy_destinations_by_unit().keys():
self.keeps.append(loc)
def recruit(self):
# I haven't discussed this at all. Perhaps a few paragraphs would be in order.
if wail.get_current_team().gold < 16: return
# find our leader
leaderpos = None
for location,unit in wail.get_units().iteritems():
if unit.can_recruit and unit.side == wail.get_current_team().side:
leaderpos = location
break
# no leader? can't recruit
if leaderpos == None: return
# is our leader on a keep? If not, move to a keep
# Maybe should always go to nearest keep
if not leaderpos in self.keeps:
for dest in wail.get_destinations_by_unit().get(leaderpos,[]):
if dest in self.keeps:
leaderpos = wail.move_unit(leaderpos,dest)
break
# is our leader on a keep now? If not, can't recruit
if leaderpos not in self.keeps: return
# build up a list of recruits and scores for each
recruit_list = []
sumweights = 0
for recruit in wail.get_current_team().recruits():
weight = self.recruit_score(recruit)
if weight < 0.01: weight = 0.01
recruit_list.append((recruit.name,weight))
sumweights += weight
# repeatedly recruit until we fail
while 1:
# pick a random recruit in proportion to the weights
r = ai.get_random(0,sumweights)
for recruit,weight in recruit_list:
r -= weight
if r < 0: break
# just use leaderpos for the location; wesnoth will always
# recruit on the nearest adjacent tile
if not wail.recruit_unit(recruit,leaderpos): break
def map_score(self,recruit):
# calculate average speed in hexes/turn
# and average defense in effective hp
m = wail.get_map()
n = m.x * m.y
speed = 0.0
defense = 0.0
for x in range(m.x):
for y in range(m.y):
loc = wail.get_location(x,y)
speed += 1.0 / recruit.movement_cost(loc)
rdm = recruit.defense_modifier(loc) - 1
if rdm:
defense += 100.0 / rdm
else:
defense += 1.00
# speed is more important on larger maps
speed *= self.mapsize * recruit.movement / n
# scaled down because effective hp is over the lifetime of the unit,
# while other scores are based on per-turn quantities
defense *= HP_SCALE * recruit.hitpoints / n
return speed,defense
def combat_score(self,recruit):
# combat advantage, in hp/turn, averaged over all enemy units
tot = 0.0
n = 0
for loc,enem in wail.get_units().iteritems():
if not enem.is_enemy: continue
n += 1
tot += self.combat_advantage(recruit,enem)
tot -= self.combat_advantage(enem,recruit)
return tot/n
def combat_advantage(self,attacker,defender):
# combat advantage for attacker attacking defender
best = 0.0
for weapon in attacker.attacks():
damage = weapon.damage * weapon.num_attacks * defender.damage_from(weapon) / 100.0
best_retal = 0.0
for retaliation in defender.attacks():
if weapon.range == retaliation.range:
retal = retaliation.damage * retaliation.num_attacks * attacker.damage_from(retaliation) / 100.0
if retal > best_retal: best_retal = retal
damage -= best_retal
if damage > best: best = damage
# scale down because not every attack hits
return best/2
def recruit_score(self,recruit):
speed,defense = self.map_score(recruit)
combat = self.combat_score(recruit)
rval = (speed + defense + combat)/recruit.cost
# only report "interesting" results
if rval > 0:
wail.log_message("%s: (%.2f + %.2f + %.2f) / %d = %.3f" % (recruit.name,speed,defense,combat,recruit.cost,rval))
return rval
def do_one_move(self):
enemlocs = wail.get_enemy_destinations_by_unit().keys()
self.enemdests = wail.get_enemy_units_by_destination().keys()
bestmove = (0,None,None,None) # score,orig,dest,target
# find the best move
for orig in wail.get_destinations_by_unit().keys():
# get a baseline score for this unit "standing pat"
base_score = self.eval_move(orig,orig)
for dest in wail.get_destinations_by_unit()[orig]:
# Bug workaround -- if we have recruited this turn,
# get_destinations_by_unit() is incorrect
if dest in wail.get_units().keys() and dest != orig: continue
score = self.eval_move(orig,dest) - base_score
if score > bestmove[0]:
bestmove = (score,orig,dest,dest)
for target in wail.get_adjacent_tiles(dest):
if target in enemlocs:
fight = self.eval_fight(wail.get_units()[orig],dest,target)+score
if orig == dest:
fight += STAND_NEXT_TO_ENEMY_PEN + NO_MOVE_PEN
else:
fight += NEXT_TO_ENEMY_PEN
if fight > bestmove[0]:
bestmove = (fight,orig,dest,target)
if bestmove[1] == None:
# no move improved the position, therefore we are done
return False
score,orig,dest,target = bestmove
wail.log_message("%.3f: %s->%s@%s"%(score,pos(orig),pos(dest),pos(target)))
if dest != orig: wail.move_unit(orig,dest)
if dest in self.notmyvillages: self.notmyvillages.remove(dest)
if target != dest: wail.attack_unit(dest,target)
return True
def eval_fight(self,unit,dest,target):
self.stats[1] += 1
enem = wail.get_units().get(target,None)
if not enem: return 0
# the base value for each unit:
# I should give more weight to defeating a garrison
unit_k = (LEVEL_SCORE*unit.type().level + BASE_UNIT_SCORE + LEADER_SCORE*unit.can_recruit\
+ FULL_XP_SCORE * unit.experience * 1.0 / unit.max_experience) * (BASE_COST_SCORE + unit.type().cost * COST_SCORE)
enem_k = (LEVEL_SCORE*enem.type().level + BASE_UNIT_SCORE + LEADER_SCORE*enem.can_recruit\
+ FULL_XP_SCORE * enem.experience * 1.0 / enem.max_experience) * (BASE_COST_SCORE + enem.type().cost * COST_SCORE)
unit_hp,enem_hp = unit.attack_statistics(dest,target)
score = 0.0
for hp,p in enem_hp.iteritems():
score += p * (enem.hitpoints - hp) * enem_k / enem.max_hitpoints
if hp<=0: score += p * enem_k
for hp,p in unit_hp.iteritems():
score -= p * (unit.hitpoints - hp) * unit_k / unit.max_hitpoints
if hp<=0: score -= p * unit_k
enem_xp = 8*enem.type().level
if enem.type().level == 0:
enem_xp = 4
unit_xp = 8*unit.type().level
if unit.type().level == 0:
unit_xp = 4
if enem.type().level >= unit.max_experience - unit.experience:
for hp, p in unit_hp.iteritems():
if hp > 0: score += LEVEL_CHANCE_BONUS * p * unit_k
elif enem_xp >= unit.max_experience - unit.experience:
for hp, p in enem_hp.iteritems():
if hp <= 0: score += LEVEL_CHANCE_BONUS * p * unit_k
if unit.type().level >= enem.max_experience - enem.experience:
for hp, p in enem_hp.iteritems():
if hp > 0: score -= LEVEL_CHANCE_BONUS * p * enem_k
elif unit_xp >= enem.max_experience - enem.experience:
for hp, p in unit_hp.iteritems():
if hp <= 0: score += LEVEL_CHANCE_BONUS * p * enem_k
return score
def eval_move(self,orig,dest):
enemlocs = wail.get_enemy_destinations_by_unit().keys()
self.stats[0] += 1
score = 0.0
unit = wail.get_units().get(orig,None)
if not unit: return
unit_k = (LEVEL_SCORE*unit.type().level + BASE_UNIT_SCORE + LEADER_SCORE*unit.can_recruit\
+ FULL_XP_SCORE * unit.experience * 1.0 / unit.max_experience) * (BASE_COST_SCORE + unit.type().cost * COST_SCORE)
# subtract 1 because terrain might be a factor
speed = unit.type().movement - 1
attackable=False
if dest in self.enemdests:
attackable = True
else:
for adj in wail.get_adjacent_tiles(dest):
if adj in self.enemdests:
attackable = True
break
# capture villages
if dest in self.notmyvillages:
score += VILLAGE_SCORE
for team in wail.get_teams():
if team.owns_village(dest) and team.is_enemy:
score += ENEMY_VILLAGE_BONUS
bestdist=100
if unit.can_recruit:
# leader stays near keep
for keep in self.keeps:
dist=dest.distance_to(keep)
if dist<bestdist:
bestdist=dist
if dist<=1: break
else:
# everyone else moves toward enemy leader
for loc,enem in wail.get_units().iteritems():
if enem.is_enemy and enem.can_recruit:
dist=dest.distance_to(loc)
if dist<bestdist:
bestdist=dist
if dist<=1: break
if bestdist > 1:
for vil in self.notmyvillages:
if dest==vil: continue
dist=dest.distance_to(vil)
if dist<bestdist:
bestdist=dist
if dist<=1: break
score += (1.0 * speed) / (bestdist + speed)
# healing
# I am ignoring the value of healers, and regenerating units. I don't think unit abilities
# are correctly reported by the API, anyway.
if (unit.poisoned or unit.hitpoints<unit.max_hitpoints) and wail.get_map().is_village(dest):
if unit.poisoned: healing = HEAL_POISON
else:
healing = unit.max_hitpoints-unit.hitpoints
if healing > 8: healing = 8
# reduce the healing bonus if we might get killed first
if attackable: healing *= HEAL_ATTACKABLE
score += HEAL_FACTOR * healing * unit_k / unit.max_hitpoints
if attackable:
# defense
score -= unit.defense_modifier(dest) * DEFENSE_FACTOR
# garrison
if wail.get_map().is_village(dest): score += GARRISON_SCORE
# reduce chances of standing next to a unit without attacking for a whole turn
if dest == orig:
score -= NO_MOVE_PEN
for target in wail.get_adjacent_tiles(dest):
if target in enemlocs:
score -= STAND_NEXT_TO_ENEMY_PEN
break
else:
for target in wail.get_adjacent_tiles(dest):
if target in enemlocs:
score -= NEXT_TO_ENEMY_PEN
break
# end mod
return score
import sys
print "Running bruteforce wail ai."
print "Wesnoth", wesnoth.get_version()
print "Python", sys.version
st = time.time()
ai = AI()
ai.recruit()
while True:
if not ai.do_one_move():
break
ai.recruit()
ai.report_stats()
print "======================="
print "bruteforce ran for %0.4f-seconds." % (time.time() - st)
print "======================="

View File

@ -1,91 +0,0 @@
#!/usr/bin/python
# This is *not* a python AI, it's just run as AI so it can get access to
# Python's runtime documentation. This documentation then simply is dumped to
# stdout in a format ready to be pasted to the wiki.
def myhelp(topic, topics):
"""Collect all the help topics into the given list."""
doc = getattr(eval(topic), "__doc__")
subtopics = []
for subtopic in getattr(eval(topic), "__dict__", []):
if subtopic.startswith("_"): continue
myhelp(topic + "." + subtopic, subtopics)
tc = getattr(eval(topic), "__class__", None)
tt = getattr(tc, "__name__", None)
if topic != "wesnoth.error":
topics.append((topic, tt, doc, subtopics))
def output(topics, level):
"""Output the given topics in wiki format, in a given heading level."""
color = 0
topics.sort()
for topic, tt, doc, subtopics in topics:
dot = topic.rfind(".")
if level == 1:
print "==", topic[dot + 1:], "module reference =="
print "''This is an automatically generated reference, but feel " +\
"free to edit it - changes will not be overwritten but " +\
"instead reviewed and included in the next version.''"
print doc or "..."
if subtopics:
funcs = []
others = []
for s in subtopics:
if s[1] == "builtin_function_or_method":
funcs.append(s)
else:
others.append(s)
if funcs:
print "=== Functions ==="
print "{|"
output(funcs, 3)
print "|}"
output(others, 2)
elif level == 2:
print "===", topic[dot + 1:], "==="
print doc or "..."
if subtopics:
print "{|"
output(subtopics, 3)
print "|}"
elif level == 3:
options = " valign='top'"
if color: options += " bgcolor='#FBF5EA'"
print "|-" + options
color = not color
if tt in ["method_descriptor", "builtin_function_or_method"]:
suffix = ""
prefix = ""
if doc and doc.startswith("Parameters:"):
l = doc.find("\n")
if l == -1: l = len(doc) - 1
suffix = "(" + doc[11:l].strip() + ")"
doc = doc[l + 1:]
else:
suffix = "()"
if doc and doc.startswith("Returns:"):
l = doc.find("\n")
if l == -1: l = len(doc) - 1
prefix = doc[8:l].strip() + " = "
doc = doc[l + 1:]
print "|'''%s()'''" % topic[dot + 1:]
print "|<code>%s%s%s</code>\n\n" % (prefix, topic[dot + 1:], suffix) +\
(doc and doc.replace("\n", "\n\n") or "...")
else:
print "|'''%s'''\n|%s" % (topic[dot + 1:],
(doc and doc.replace("\n", "\n\n") or "..."))
if __name__ == "__main__":
import os
# If we are run as script, run wesnoth with the --python-api switch.
os.system("src/wesnoth --python-api")
else:
# If we are run as a python script, output the documentation to stdout.
import ai as wesnoth
topics = []
myhelp("wesnoth", topics)
output(topics, 1)

View File

@ -1,57 +0,0 @@
import re, os, safe
whitelisted = ["ai", "collections", "functools", "heapq", "math", "Queue", "re", "sets", "string", "threading", "time", "wail", "wesnoth"]
rex = re.compile(r"^import\s+(.*)", re.M)
modules = {}
def include(matchob):
"""
Regular expression callback. Handles a single import statement, returning
the included code.
"""
names = [x.strip() for x in matchob.group(1).split(",")]
r = ""
for name in names:
alias = None
if " as " in name:
(name, ignored, alias) = name.split(' ')
if name in whitelisted:
if alias:
modules[alias] = __import__(name)
else:
modules[name] = __import__(name)
continue
for path in pathes:
includefile = os.path.join(path, name)
try:
code = parse_file(includefile + ".py")
break
except IOError:
pass
else:
raise safe.SafeException("Could not import '%s'." % name)
r += code
return r
def parse_file(name):
"""
Simple pre-parsing of scripts, all it does is allow importing other scripts.
"""
abspath = os.path.abspath(name)
if abspath in already: return ""
already[abspath] = 1
code = file(abspath).read().replace(chr(13), "")
code = rex.sub(include, code)
return code
# If you want to disable safe python, use this instead:
#
# def parse(name): return open(name).read(), {}
def parse(name):
global already, modules
already = {}
modules = {}
return parse_file(name), modules

View File

@ -1,168 +0,0 @@
"""An attempt at creating a safe_exec for python.
This file is public domain and is not suited for any serious purpose.
This code is not guaranteed to work. Use at your own risk!
Beware! Trust no one!
Please e-mail philhassey@yahoo.com if you find any security holes.
svn://www.imitationpickles.org/pysafe/trunk
See README.txt, NOTES.txt, CHANGES.txt for more details.
"""
import compiler
import __builtin__
class SafeException(Exception):
"""Base class for Safe Exceptions"""
def __init__(self,*value):
self.value = str(value)
def __str__(self):
return self.value
class CheckNodeException(SafeException):
"""AST Node class is not in the whitelist."""
pass
class CheckStrException(SafeException):
"""A string in the AST looks insecure."""
pass
class RunBuiltinException(SafeException):
"""During the run a non-whitelisted builtin was called."""
pass
_NODE_CLASS_OK = [
'Add', 'And', 'AssAttr', 'AssList', 'AssName', 'AssTuple',
'Assert', 'Assign','AugAssign', 'Bitand', 'Bitor', 'Bitxor', 'Break',
'CallFunc', 'Class', 'Compare', 'Const', 'Continue',
'Dict', 'Discard', 'Div', 'Ellipsis', 'Expression', 'FloorDiv',
'For', 'From', 'Function', 'Getattr', 'If', 'Keyword', 'Lambda',
'LeftShift', 'List', 'ListComp', 'ListCompFor', 'ListCompIf', 'Mod',
'Module', 'Mul', 'Name', 'Node', 'Not', 'Or', 'Pass', 'Power',
'Print', 'Printnl', 'Raise', 'Return', 'RightShift', 'Slice', 'Sliceobj',
'Stmt', 'Sub', 'Subscript', 'TryExcept', 'Tuple', 'UnaryAdd',
'UnarySub', 'While',
]
_NODE_ATTR_OK = []
# Expanded to allow repr, str, call, and doc. These are commonly overloaded
# to provided fundamental functionality. Without __call__ support, most
# categories of decorators are simply impossible.
_STR_OK = [ '__call__', '__copy__', '__deepcopy__',
'__init__', '__name__', '__repr__', '__str__' ]
# If we put '__' in _STR_NOT_CONTAIN, then we can't have defacto private data
_STR_NOT_CONTAIN = []
_STR_NOT_END = [ '__' ]
_STR_NOT_BEGIN = ['im_','func_','tb_','f_','co_',]
## conservative settings
#_NODE_ATTR_OK = ['flags']
#_STR_NOT_CONTAIN = ['_']
#_STR_NOT_BEGIN = []
def _check_node(node):
if node.__class__.__name__ not in _NODE_CLASS_OK:
raise CheckNodeException(node.lineno,node.__class__.__name__)
for k,v in node.__dict__.items():
if k in _NODE_ATTR_OK: continue
if v in _STR_OK: continue
if type(v) not in [str,unicode]: continue
for s in _STR_NOT_CONTAIN:
if s in v: raise CheckStrException(node.lineno,k,v)
for s in _STR_NOT_BEGIN:
if v[:len(s)] == s: raise CheckStrException(node.lineno,k,v)
for s in _STR_NOT_END:
if v.endswith( s ): raise CheckStrException( node.lineno, k, v )
for child in node.getChildNodes():
_check_node(child)
def _check_ast(code):
ast = compiler.parse(code)
_check_node(ast)
_BUILTIN_OK = [
'__debug__','quit','exit',
'Warning', 'restricted',
'None','True','False',
'abs', 'bool', 'callable', 'chr', 'cmp', 'complex', 'dict', 'divmod', 'filter',
'float', 'frozenset', 'hash', 'hex', 'int', 'isinstance', 'issubclass', 'len',
'list', 'long', 'map', 'max', 'min', 'object', 'oct', 'ord', 'pow', 'range',
'repr', 'reversed', 'round', 'set', 'slice', 'str', 'sum', 'super', 'tuple', 'xrange', 'zip',
'ArithmeticError', 'AssertionError', 'AttributeError', 'BaseException', 'Exception',
'IndexError', 'KeyError', 'NameError', 'RuntimeError', 'RuntimeWarning', 'StopIteration',
'ValueError', 'ZeroDivisionError'
]
_BUILTIN_STR = [
'copyright','credits','license','__name__','__doc__',
]
def _builtin_fnc(k):
def fnc(*vargs,**kargs):
raise RunBuiltinException(k)
return fnc
_builtin_globals = None
_builtin_globals_r = None
def _builtin_init():
global _builtin_globals, _builtin_globals_r
if _builtin_globals != None: return
_builtin_globals_r = __builtin__.__dict__.copy()
r = _builtin_globals = {}
for k in __builtin__.__dict__.keys():
v = None
if k in _BUILTIN_OK:
v = __builtin__.__dict__[k]
elif k in _BUILTIN_STR:
v = ''
else:
v = _builtin_fnc(k)
r[k] = v
def _builtin_destroy():
_builtin_init()
for k,v in _builtin_globals.items():
__builtin__.__dict__[k] = v
def _builtin_restore():
for k,v in _builtin_globals_r.items():
__builtin__.__dict__[k] = v
def safe_check(code):
"""Check the code to be safe."""
return _check_ast(code)
def safe_run(code,context=None):
"""Exec code with only safe builtins on."""
if context == None: context = {}
_builtin_destroy()
try:
#exec code in _builtin_globals,context
context['__builtins__'] = _builtin_globals
exec code in context
_builtin_restore()
except:
_builtin_restore()
raise
def safe_exec_noop( code, context=None ):
exec code in context
def safe_exec_op( code, context=None ):
safe_check( code )
safe_run( code, context )
# Wrapper allowing safe_exec to be dynamically controlled
# from wesnoth binary.
def safe_exec( code, context=None, runSafe=True ):
context[ 'restricted' ] = runSafe
if runSafe:
safe_exec_op( code, context )
else:
safe_exec_noop( code, context )

View File

@ -1,405 +0,0 @@
#!WPY
"""This is a rather simple minded example of a python AI."""
import ai as wesnoth
import heapq
def pos(location):
"""Just a helper function for printing positions in debug messages."""
return "(%d, %d)" % (1 + location.x, 1 + location.y)
def debug(string):
pass
class AI:
"""A class representing our AI."""
def __init__(self):
"""This class is constructed once for each turn of the AI. To get
persistent variables across terms, which also are saved when the game is
saved, use set_variable and get_variable."""
self.team = wesnoth.get_current_team()
self.village_radius = 25
self.scout_villages = 3
self.recruit()
self.fight()
self.conquer()
def conquer(self):
"""Try to capture villages."""
villages = self.find_villages()
units = wesnoth.get_destinations_by_unit().keys()
# Construct a sorted list of (distance, unit, village) triples.
queue = []
for village in villages:
for unit in units:
d = self.get_distance(unit, village)
if d != None: heapq.heappush(queue, (d, unit, village))
# Now assign units to villages, and move them.
while queue:
d, unit, village = heapq.heappop(queue)
if unit in units and village in villages:
units.remove(unit)
villages.remove(village)
self.go_to(unit, village)
if not units: break
if not villages: break
def cumulate_damage(self, cumulated, hitpoints, new_damage):
cumulated2 = {}
for already, ap in cumulated.iteritems():
for hp, probability in new_damage.iteritems():
damage = int(already + hitpoints - hp)
cumulated2[damage] = cumulated2.get(damage, 0) + ap * probability
return cumulated2
def danger_estimate(self, unit, where, enemy):
"""Get some crude indication about how unsafe it is for unit to get
attacked by enemy at where."""
scores = []
u = wesnoth.get_units()[unit]
e = wesnoth.get_units()[enemy]
u_defense = u.defense_modifier(wesnoth.get_map(), where)
e_defense = e.defense_modifier(wesnoth.get_map(), enemy)
u_bonus = 100 - (u.type().alignment - 1) * wesnoth.get_gamestatus().lawful_bonus
e_bonus = 100 - (e.type().alignment - 1) * wesnoth.get_gamestatus().lawful_bonus
for attack in e.attacks():
score = attack.damage * attack.num_attacks * e_bonus / 100
score *= u_defense
score *= u.damage_against(attack) / 100
back = []
for retaliation in u.attacks():
if attack.range == retaliation.range:
x = retaliation.damage * retaliation.num_attacks * u_bonus / 100
x *= e_defense
x *= e.damage_against(retaliation) / 100
back.append(x)
if back:
r = max(back)
score -= r
heapq.heappush(scores, score)
return scores[0]
def danger(self, unit, location):
"""Try to estimate danger of moving unit to location."""
attackers = []
for enemy, destinations in wesnoth.get_enemy_destinations_by_unit():
for tile in wesnoth.get_adjacent_tiles(unit):
if tile in destinations:
heuristic = danger_estimate(unitm, location, enemy)
if heuristic > 0:
heapq.heappush(attackers, (-heuristic, enemy, tile))
result = 0
already = {}
while attackers:
danger, enemy, tile = heapq.heappop(attackers)
if not already[enemy] and not already[tile]:
danger = -danger
result += danger
already[enemy] = 1
already[tile] = 1
return result
def fight(self):
"""Attack enemies."""
enemies = wesnoth.get_enemy_destinations_by_unit().keys()
units = wesnoth.get_destinations_by_unit().keys()
# Get a list of all units we can possibly kill and their chance to kill.
# This is just a heuristic, ignoring ZoC and unit placement.
kills = []
for enemy in enemies:
e = wesnoth.get_units()[enemy]
k = {0: 1.0}
for unit, destinations in wesnoth.get_destinations_by_unit().iteritems():
u = wesnoth.get_units()[unit]
for tile in wesnoth.get_adjacent_tiles(enemy):
if tile in destinations:
own_hp, enemy_hp = u.attack_statistics(tile, enemy)
k = self.cumulate_damage(k, e.hitpoints, enemy_hp)
ctk = 0
for damage, p in k.iteritems():
if damage >= e.hitpoints:
ctk += p
if ctk:
heapq.heappush(kills, (-ctk, enemy))
# Now find positions from where own units can attack the to be killed
# enemies.
attacks = []
while kills:
ctk, enemy = heapq.heappop(kills)
e = wesnoth.get_units()[enemy]
ctk = -ctk
for tile in wesnoth.get_adjacent_tiles(enemy):
for unit in wesnoth.get_units_by_destination().get(tile, []):
u = wesnoth.get_units()[unit]
own_hp, enemy_hp = u.attack_statistics(tile, enemy)
score = e.hitpoints - sum([x[0] * x[1] for x in enemy_hp.iteritems()])
score -= u.hitpoints - sum([x[0] * x[1] for x in own_hp.iteritems()])
# This is so if there are two equally good attack
# possibilities, we chose the one on better terrain.
score *= 50 / u.defense_modifier(tile)
heapq.heappush(attacks, (-score, unit, tile, enemy))
#print own_hp, enemy_hp
debug("Score for %s at %s: %s<->%s: %f [%s]" % (u.name,
pos(unit), pos(tile), pos(enemy), score, e.name))
# Now assign units to enemies, and move and attack.
while attacks:
score, unit, tile, enemy = heapq.heappop(attacks)
score = -score
if unit in units and enemy in enemies:
#try:
loc = wesnoth.move_unit(unit, tile)
#except ValueError:
# loc = None
if loc == tile:
e = wesnoth.get_units()[enemy]
wesnoth.attack_unit(tile, enemy)
if not e.is_valid:
enemies.remove(enemy)
units.remove(unit)
if not units: break
def recruit(self):
"""Recruit units."""
# Check if there is any gold left first.
cheapest = min([x.cost for x in self.team.recruits()])
if self.team.gold < cheapest: return
# Find all keeps in the map.
keeps = self.find_keeps()
# Find our leader.
leader = None
for location, unit in wesnoth.get_units().iteritems():
if unit.side == self.team.side and unit.can_recruit:
leader = location
break
# Get number of villages to capture near to the leader.
villages = len([x for x in self.find_villages()
if leader.distance_to(x) < self.village_radius])
units_recruited = int(wesnoth.get_variable("units_recruited") or 0)
def attack_score(u1, u2):
"""Some crude score of u1 attacking u2."""
maxdeal = 0
for attack in u1.attacks():
deal = attack.damage * attack.num_attacks
deal *= u2.damage_from(attack) / 100.0
for defense in u2.attacks():
if attack.range == defense.range:
receive = defense.damage * defense.num_attacks
receive *= u1.damage_from(defense) / 100.0
deal -= receive
if deal > maxdeal: maxdeal = deal
return maxdeal
def recruit_score(recruit, speed, defense, aggression, resistance):
"""Score for recruiting the given unit type."""
vscout_minus_recruits = self.scout_villages - units_recruited
if vscout_minus_recruits == 0:
vscout_minus_recruits = 1 # prevent div-by-zero
need_for_speed = 3 * (villages / vscout_minus_recruits)
if need_for_speed < 0: need_for_speed = 0
v = speed * need_for_speed + defense * 0.1 + aggression + resistance
v += 1
if v < 1: v = 1
return v
# Try to figure out which units are good in this map.
map = wesnoth.get_map()
recruits = self.team.recruits()
recruits_list = []
for recruit in recruits:
speed = 0.0
defense = 0.0
n = map.x * map.y
for y in range(map.y):
for x in range(map.x):
location = wesnoth.get_location(x, y)
speed += recruit.movement_cost(location)
defense += 100 - recruit.defense_modifier(location)
speed = recruit.movement * n / speed
defense /= n
aggression = 0.0
resistance = 0.0
enemies = wesnoth.get_enemy_destinations_by_unit().keys()
n = len(enemies)
for location in enemies:
enemy = wesnoth.get_units()[location]
aggression += attack_score(recruit, enemy)
resistance -= attack_score(enemy, recruit)
if n > 0:
aggression /= n
resistance /= n
debug("%s: speed: %f, defense: %f, aggression: %f, resistance: %f" %
(recruit.name, speed, defense, aggression, resistance))
recruits_list.append((recruit, speed, defense, aggression, resistance))
# Now recruit.
for location, unit in wesnoth.get_units().iteritems():
if unit.side == self.team.side and unit.can_recruit:
keepsort = []
for keep in keeps:
heapq.heappush(keepsort, (location.distance_to(keep), keep))
keep = keepsort[0][1]
self.go_to(location, keep)
for i in range(6): # up to 6 units (TODO: can be more)
# Get a random, weighted unit type from the available.
heap = []
total_v = 0
for r in recruits_list:
v = recruit_score(*r)
v *= v * v
total_v += v
heapq.heappush(heap, (-v, r[0]))
r = wesnoth.get_random(0, total_v)
while 1:
v, recruit = heapq.heappop(heap)
debug("%d %d" % (r, v))
r += v
if r <= 0: break
# Try to recruit it on the adjacent tiles
# TODO: actually, it should just use the nearest possible
# location
for position in wesnoth.get_adjacent_tiles(location):
if wesnoth.recruit_unit(recruit.name, position):
break
else:
# was not possible -> we're done
break
units_recruited += 1
wesnoth.set_variable("units_recruited", str(units_recruited))
def find_villages(self):
"""Find all villages which are unowned or owned by enemies."""
villages = []
m = wesnoth.get_map()
for x in range(m.x):
for y in range(m.y):
location = wesnoth.get_location(x, y)
if wesnoth.get_map().is_village(location):
for team in wesnoth.get_teams():
# does it alreadey belong to use or an ally?
if team.owns_village(location) and not team.is_enemy:
break
else:
# no, either it belongs to an enemy or to nobody
villages.append(location)
return villages
def find_keeps(self):
"""Find keep locations."""
keeps = []
m = wesnoth.get_map()
for x in range(m.x):
for y in range(m.y):
location = wesnoth.get_location(x, y)
if wesnoth.get_map().is_keep(location):
keeps.append(location)
return keeps
def get_distance(self, location, target, must_reach = False):
"""Find out how many turns it takes the unit at location to reach target."""
if location == target: return 0
unit = wesnoth.get_units()[location]
path = unit.find_path(location, target, 100)
extra = 0
if not path:
extra = 1
if must_reach: return None
for adjacent in wesnoth.get_adjacent_tiles(target):
# Consider 5 turns worth of movement of this unit.
path = unit.find_path(location, adjacent,
unit.type().movement * 5)
if path: break
else:
return None
l = 0
for location in path:
l += unit.movement_cost(location)
l -= unit.movement_left
l /= unit.type().movement
l += 1 + extra
return l
def attack(self, location, enemy):
"""Attack an enemy unit."""
wesnoth.attack_unit(location, enemy)
def go_to(self, location, target, must_reach = False):
"""Make a unit at the given location go to the given target.
Returns the reached position.
"""
if location == target: return location
# If target is occupied, try to go near it
unit_locations = wesnoth.get_units().keys()
if target in unit_locations:
if must_reach: return location
adjacent = wesnoth.get_adjacent_tiles(target)
targets = [x for x in adjacent if not x in unit_locations]
if targets:
target = targets[0]
else:
return location
# find a path
for l, unit in wesnoth.get_units().iteritems():
if location == l:
path = unit.find_path(location, target, unit.type().movement * 5)
break
else:
return location
if path:
possible_destinations = wesnoth.get_destinations_by_unit().get(location, [])
if must_reach:
if not target in path: return location
if not target in possible_destinations: return location
# find first reachable position in reversed path
path.reverse()
for p in path:
if p in possible_destinations and not p in unit_locations:
location = wesnoth.move_unit(location, p)
return location
return location
#import sys
print "Running sample ai."
print "Wesnoth", wesnoth.get_version()
#print "Python", sys.version
AI()

View File

@ -1,28 +0,0 @@
# Wesnoth AI Library
#
__author__ = 'Greg Copeland'
__version__ = 0.1
__all__ = [ 'app', 'basetype', 'decorator', 'rwlock' ]
# Load __all__ wail namespace so that a simple 'import wail' gives
# access to them via wail.<name>
from wesnoth import *
print "============================================="
print "'wesnoth' module has been absorbed into wail."
print "============================================="
##import app
##import basetype
##import decorator
##import rwlock
# Fix wesnoth functions with decorated functions here to ensure
# safe locking throughout.
print "============================================="
print "wail module has been imported."
print "============================================="

View File

@ -1,216 +0,0 @@
from __future__ import with_statement
def callWith( *argTypes, **kw ):
"""
Validates the call arguments match the expressed signature.
"""
try:
def callDecorator( f ):
def newFunction( *args, **kw ):
assert len(args) == len(argTypes)
argTypeList = tuple( map( type, args ) )
if argTypes != argTypeList:
raise TypeError( "Called function with '%s' but '%s' is required." % \
(str(argTypeList), str(argTypes)) )
return f( *args, **kw )
newFunction.__name__ = f.__name__
return newFunction
return callDecorator
except KeyError, key:
raise KeyError, key + "is not a valid keyword argument"
except TypeError, msg:
raise TypeError, msg
def returnWith( retType, **kw ):
"""
Validates the data return is of a specific type. If it is not,
raise TypeError exception.
"""
try:
def returnDecorator( f ):
def newFunction( *args, **kw ):
result = f( *args, **kw )
resultType = type( result )
if resultType != retType:
raise TypeError( "Function returned '%s' but '%s' is required." % \
(str(resultType), str(retType)) )
return result
newFunction.__name__ = f.__name__
return newFunction
return returnDecorator
except KeyError, key:
raise KeyError, key + "is not a valid keyword argument"
except TypeError, msg:
raise TypeError, msg
class memoized( object ):
"""
Decorator that caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned, and
not re-evaluated.
"""
def __init__( self, func, lock ):
super( memoized, self ).__init__()
self.__func = func
self.__lock = lock
self.__cache = {}
self.__hits = 0L
self.__misses = 0L
self.__name__ = func.__name__
self.__doc__ = func.__doc__
self.__str__ = func.__str__
self.__repr__ = func.__repr__
def __call__( self, *args, **kw ):
try:
funcKey = ( args, tuple( kw.iteritems() ) )
with self.__lock:
result = self.__cache[ funcKey ]
self.__hits += 1L
return result
except KeyError:
funcKey = ( args, tuple( kw.iteritems() ) )
with self.__lock:
result = self.__cache[ funcKey ] = self.__func( *args, **kw )
self.__misses += 1L
return result
except TypeError:
# uncachable -- for instance, passing a list as an argument.
# Better to not cache than to blow up entirely.
with self.__lock:
result = self.__func( *args, **kw )
self.__misses += 1L
return result
def _realCall( self, *args, **kw ):
"""Bypass the caching logic directly. Statistics are unchanged."""
with self.__lock:
return self.__func( *args, **kw )
def getStats( self ):
"""
Provide cache results, as a tuple, in the form of
(total calls, misses, hits).
"""
with self.__lock:
return (self.__hits + self.__misses, self.__hits, self.__misses)
def reset( self ):
"""
Reset the cache. Use when results of underlying function call may
have changed.
"""
with self.__lock:
self.__hits = 0L
self.__misses = 0L
self.__cache.clear()
def withLockAndReset( funcWrite, funcCache, lock, *args, **kw ):
def withLockAndResetCallWrapper( *args, **kw):
with lock:
if isinstance( funcCache, tuple ) or \
isinstance( funcCache, list ):
for func in funcCache:
func.reset()
else:
funcCache.reset()
return funcWrite( *args, **kw )
## Make the returned function look like the real thing
withLockAndResetCallWrapper.__name__ = funcWrite.__name__
withLockAndResetCallWrapper.__doc__ = funcWrite.__doc__
withLockAndResetCallWrapper.__str__ = funcWrite.__str__
withLockAndResetCallWrapper.__repr__ = funcWrite.__repr__
return withLockAndResetCallWrapper
def withLock( func, lock, *args, **kw ):
def withLockWrapper( *args, **kw ):
with lock:
return func( *args, **kw )
withLockWrapper.__name__ = func.__name__
withLockWrapper.__doc__ = func.__doc__
withLockWrapper.__str__ = func.__str__
withLockWrapper.__repr__ = func.__repr__
return withLockWrapper
##class memoized( object ):
## """
## Decorator that caches a function's return value each time it is called.
## If called later with the same arguments, the cached value is returned, and
## not re-evaluated.
## """
## def __init__( self, func ):
## super( memoized, self ).__init__()
## self.__func = func
## self.__cache = {}
## self.__hits = 0L
## self.__misses = 0L
## def __repr__( self ):
## """Return the function's docstring."""
## return self.__func.__doc__
## def __call__( self, *args, **kw ):
## try:
## funcKey = ( args, tuple( kw.iteritems() ) )
## result = self.__cache[ funcKey ]
## self.__hits += 1L
## return result
## except KeyError:
## funcKey = ( args, tuple( kw.iteritems() ) )
## retValue = self.__cache[ funcKey ] = self.__func( *args, **kw )
## self.__misses += 1L
## return retValue
## except TypeError:
## # uncachable -- for instance, passing a list as an argument.
## # Better to not cache than to blow up entirely.
## retValue = self.__func( *args, **kw )
## self.__misses += 1L
## return retValue
## def _realCall( self, *args, **kw ):
## """Bypass the caching logic directly. Statistics are unchanged."""
## return self.__func( *args, **kw )
## def getStats( self ):
## """
## Provide cache results.
## """
## return (self.__hits + self.__misses, self.__hits, self.__misses)
## def reset( self ):
## """
## Reset the cache. Use when results of underlying function call may
## have changed.
## """
## self.__cache.clear()

View File

@ -1,331 +0,0 @@
#!/usr/bin/env python
#
# A Reader/Writer lock implementation
# Copyright Greg Copeland, 2008 - 2009
# Released under GPL license for Wesnoth. See Wesnoth's
# licensing terms for this's module's license.
#
# This is a non-recursive, reader/writer (shared) lock implementation.
# Recursive use of these locks will result in deadlock. Upgrading and
# downgrading of locks (reader -> writer and writer -> reader) are possible
# for many use cases. Don't forget, if the same thread attempts to grab
# a writer lock while currently holding the same writer lock, deadlock is
# ensured.
#
# My implementation has been inspired by Dmitry Dvoinikov's <dmitry@targeted.org>
# (recusrive) shared lock implementation. Despite his implementation, my implementation
# is distinct. My implementation provides writer fifo order by default unless reader
# biases is enabled at lock creation time. My implementation attempts to honor fifo
# ordering on lock requests; thusly preventing reader/writer starvation, which is common
# to Dmitry's (and others) implementation. Likewise, my lock implementation is fully
# deterministic, with or without reader bias, while his implementation is not. My
# implementation also avoids a potential race condition, which Dmitry's potentially has
# when using timeouts.
#
# If I do say so, this implementation is pretty speedy and yet still has room for
# additional optimization.
#
from __future__ import with_statement
try:
import thread
except ImportError:
del _sys.modules[__name__]
raise
from threading import Event, currentThread
__all__ = [ "RWLock", "WithWriteLock" ]
_allocate_lock = thread.allocate_lock
del thread
class RWLock( object ):
"""
Non-recursive reader/writer (shared) lock.
"""
def __init__( self, readerBias=False, biasFactor=2 ):
super( RWLock, self ).__init__()
self._readerBias = readerBias
self._readerBiasFactor = biasFactor
self._owner = None
self._readers = []
self._pendWriters = []
self._pendReaders = []
self._biasedReads = 0L
self._readerBiasCount = 0L
self.__stateLock = _allocate_lock()
self.__eventPool = [ Event(), Event(), Event(), Event() ]
self.eventCount = len(self.__eventPool)
def __str__( self ):
with self.__stateLock:
if self._owner:
name = self._owner.getName()
else:
name = 'None'
readerNames = pendNames = writerNames = ''
for t in self._readers:
readerNames += t.getName() + ','
readerNames = '[' + readerNames[:-1] + ']'
for t, l in self._pendReaders:
pendNames += t.getName() + ','
pendNames = '[' + pendNames[:-1] + ']'
for t, l in self._pendWriters:
writerNames += t.getName() + ','
writerNames = '[' + writerNames[:-1] + ']'
result = "<RWLock; Owner:%s, Readers:%s, Pending:%s, Writers Pending:%s>" % \
(name, readerNames, pendNames, writerNames)
return result
def _getEventLock( self ):
"""
Method should be called with the lock's state lock
held.
"""
# If we have available an event semaphore, return it; otherwise create a new one
if self.__eventPool:
lock = self.__eventPool.pop()
else:
lock = Event()
self.eventCount += 1
return lock
def _returnEvent( self, lock ):
lock.clear()
self.__eventPool.append( lock )
def _honorReader( self ):
"""
Based on reader bias, determine if we will honor a writer's
position in the lock fifo order.
"""
retValue = False
if self._readerBias:
if self._readerBiasCount and \
self._readerBiasCount%self._readerBiasFactor == 0:
# No reader bias - it's time for writer to advance
self._readerBiasCount = 0L
else:
# Reader bias - writers lose their place this time
self._biasedReads += 1L
retValue = True
self._readerBiasCount += 1
return retValue
def _wakeThreads( self ):
"""
The internal state lock better be enabled before
this method is called. Otherwise internal state
corruption is likely.
"""
current = currentThread()
wakeWriter = wakeReaders = False
if not self._owner and not self._readers:
# No reader or writer lock in place - must advance someone
if not self._readers and self._pendWriters and \
not self._readerBias:
# No reader locks and pending writers and no reader bias
wakeWriter = True
elif self._pendReaders:
# No pending writers and have pending readers
wakeReaders = True
elif not self._readers and self._pendWriters:
# Now try to advance writers regardless of bias setting
wakeWriter = True
# Build list of locks to release
wakeList = []
if wakeWriter:
self._owner, lock = self._pendWriters.pop( 0 )
wakeList.append( lock )
elif wakeReaders:
for thrd, lock in self._pendReaders:
wakeList.append( lock )
self._readers.append( thrd )
del self._pendReaders[:]
if wakeList:
for lock in wakeList:
lock.set()
def _waitOnLock( self, lock, timeout, readLock=True ):
"""
Method must not be called with state lock held.
"""
retValue = True
if not timeout:
lock.wait()
else:
lock.wait( timeout )
current = currentThread()
with self.__stateLock:
retValue = lock.isSet()
if not retValue:
# Our lock failed
lockTuple = (current, lock)
if readLock and lockTuple in self._pendReaders:
self._pendReaders.remove( lockTuple )
else:
self._pendWriters.remove( lockTuple )
# Must always wake up threads when a failure occurs
# as someone was likely waiting for this thread to finish.
self._wakeThreads()
self._returnEvent( lock )
return retValue
def acquire( self, timeout=None ):
"""
Return True if lock is acquired, otherwise, return False.
"""
lock = None
retValue = True
current = currentThread()
with self.__stateLock:
if not self._owner:
if not self._pendWriters:
# No one waiting - fast path
self._readers.append( current )
elif self._readers and self._honorReader():
# No writer lock and no one queued to get a writer lock
# so fast path this lock - no wait. Or, we have pending
# writers and fifo order will not be honored because of
# readerBias setting.
self._readers.append( current )
else:
lock = self._getEventLock()
self._pendReaders.append( (current, lock) )
elif current is self._owner:
# This thread already holds a writer lock - so fast path
# a reader lock. Writer lock is not released.
self._readers.append( current )
else:
# All other cases we need to wait in line
# Because writer lock already exists!
lock = self._getEventLock()
self._pendReaders.append( (current, lock) )
if lock:
retValue = self._waitOnLock( lock, timeout )
return retValue
def acquireWriter( self, timeout=None ):
lock = None
retValue = True
lockTuple = None
createLock = True
current = currentThread()
with self.__stateLock:
# We will always create a lock unless no writer exists,
# no readers, no readers pending, and no writers pending,
# unless the only lock is a read lock granted to this thread.
if not self._owner:
if not self._pendReaders and \
not self._pendWriters:
# No pending readers or writers
if self._readers and \
self._readers == [current]:
# We have a reader lock issued but we own it - upgrade lock
createLock = False
elif not self._readers:
createLock = False
elif self._readers == [current]:
# Special lock upgrade - thread already has read lock and thread
# is the only reader so upgrade lock to writer.
createLock = False
# For most every case, we'll have to create a lock
# If we do, we must wait to be notified.
if createLock:
lock = self._getEventLock()
lockTuple = (current, lock)
self._pendWriters.append( lockTuple )
else:
# We didn't create a lock so change ownership
self._owner = current
if lock:
retValue = self._waitOnLock( lock, timeout, False )
return retValue
def __enter__( self, timeout=None ):
return self.acquire( timeout )
def release( self ):
with self.__stateLock:
try:
self._readers.remove( currentThread() )
self._wakeThreads()
except ValueError:
raise Exception, "error: release unlocked lock"
def releaseWriter( self ):
with self.__stateLock:
if currentThread() == self._owner:
self._owner = None
self._wakeThreads()
else:
raise Exception, "error: release unlocked lock"
def __exit__( self, *args, **kw ):
self.release()
def acquire( lock, timeout=None ):
lock.acquire()
return lock
def acquireWrite( lock, timeout=None ):
lock.acquireWrite()
return lock
class WithWriteLock( object ):
def __init__( self, lock ):
super( WithWriteLock, self ).__init__()
self.__lock = lock
def __str__( self ):
return str( self.__lock )
def __enter__( self, timeout=None ):
return self.__lock.acquireWriter( timeout )
def __exit__( self, *args, **kw ):
return self.__lock.releaseWriter()
def acquire( self, timeout=None ):
return self.__lock.acquireWriter( timeout )
def release( self ):
return self.__lock.releaseWriter()

View File

@ -1,687 +0,0 @@
#!/usr/bin/env python
#
# Most of these tests were shamelessly taken and adopted from
# Dmitry Dvoinikov's shared lock tests. As the lock implementations
# differ, the units tests were significantly changed but logically
# are identical. As Dmitry's lock implemetation is fully recursive,
# tests which require lock recursion have been removed.
#
# Special thanks to Dmitry for having his shared lock tests available.
# This test suite made it especially easy to find the tricky corner cases
# which plagued rwlock implementation.
#
from __future__ import with_statement
import time
from rwlock import RWLock, WithWriteLock
from threading import Event, currentThread
from thread import allocate_lock as _allocate_lock
if __name__ == '__main__':
import time
def reader( lock, *args, **kw ):
with lock:
print currentThread(), "reader is reading...", str(lock)
time.sleep( 8 )
print "reader has completed work. Exiting..."
time.sleep( 15 )
print str(lock)
print currentThread(), "lock released and is exiting"
def writer( lock, *args, **kw ):
with lock:
print currentThread(), "writer is writer...", str(lock)
time.sleep( 4 )
time.sleep( 15 )
print str(lock)
print currentThread(), "lock released and is exiting"
## rl = RWLock( readerBias=True, biasFactor=4 )
## rl = RWLock( readerBias=True, biasFactor=1 )
## rl = RWLock( readerBias=True, biasFactor=3 )
rl = RWLock()
wl = WithWriteLock( rl )
import threading
r1 = threading.Thread( name="r1", target=reader, args=(rl, 1) )
r2 = threading.Thread( name="r2", target=reader, args=(rl, 1) )
r3 = threading.Thread( name="r3", target=reader, args=(rl, 1) )
r4 = threading.Thread( name="r4", target=reader, args=(rl, 1) )
r5 = threading.Thread( name="r5", target=reader, args=(rl, 1) )
r6 = threading.Thread( name="r6", target=reader, args=(rl, 1) )
r7 = threading.Thread( name="r7", target=reader, args=(rl, 1) )
r8 = threading.Thread( name="r8", target=reader, args=(rl, 1) )
r9 = threading.Thread( name="r9", target=reader, args=(rl, 1) )
w1 = threading.Thread( name="w1", target=writer, args=(wl, 1) )
w2 = threading.Thread( name="w2", target=writer, args=(wl, 1) )
w3 = threading.Thread( name="w3", target=writer, args=(wl, 1) )
print "Starting basic reader/writer lock testing..."
r1.start()
r2.start()
r3.start()
time.sleep( 2 )
w1.start()
time.sleep( 2 )
w2.start()
r4.start()
r5.start()
r6.start()
w3.start()
time.sleep( 2 )
r7.start()
r8.start()
r9.start()
time.sleep( 15 )
print "Main thread is waiting on lock..."
print str(rl)
with wl:
print str(rl)
print "Main thread has obtained lock and is joining on workers..."
joinList = ( r1, r2, r3, r4, r5, r6, r7, r8, r9, w1, w2, w3 )
for t in joinList:
t.join()
print "All joins completed!"
print "Main thread has completed!"
print "reader bias:", rl._readerBias, "factor:", rl._readerBiasFactor, "with", rl._biasedReads
print str(rl)
print "event count", rl.eventCount
print "We're here, so basic testing worked! Extensively self-testing module...."
from threading import Thread
from time import sleep, time
from random import random, randint
from math import log10
Lock = _allocate_lock
log_lock = Lock()
def log(s):
log_lock.acquire()
try:
print s
finally:
log_lock.releaseWriter()
def deadlocks(f, t):
th = Thread(target = f)
th.setName("Thread")
th.setDaemon(1)
th.start()
th.join(t)
return th.isAlive()
def threads(n, *f):
start = time()
evt = Event()
ths = [ Thread(target = f[i % len(f)], args = (evt, )) for i in range(n) ]
for i, th in enumerate(ths):
th.setDaemon(1)
th.setName(f[i % len(f)].__name__)
th.start()
evt.set()
for th in ths:
th.join()
return time() - start
# simple test
print "simple test:",
currentThread().setName("MainThread")
lck = RWLock()
assert str(lck) == '<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>'
assert lck.acquireWriter()
assert str(lck) == '<RWLock; Owner:MainThread, Readers:[], Pending:[], Writers Pending:[]>'
lck.releaseWriter()
assert str(lck) == '<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>'
assert lck.acquire()
assert str(lck) == '<RWLock; Owner:None, Readers:[MainThread], Pending:[], Writers Pending:[]>'
lck.release()
assert str(lck) == '<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>'
try:
lck.releaseWriter()
except Exception, e:
assert str(e) == 'error: release unlocked lock'
else:
assert False
try:
lck.release()
except Exception, e:
assert str(e) == 'error: release unlocked lock'
else:
assert False
print "ok"
# same thread shared/exclusive upgrade test
print "same thread shared/exclusive upgrade test:",
lck = RWLock()
def upgrade():
# ex -> sh <- sh <- ex
assert str(lck) == '<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>'
assert lck.acquireWriter(), str(lck)
assert str(lck) == '<RWLock; Owner:Thread, Readers:[], Pending:[], Writers Pending:[]>', str(lck)
assert lck.acquire(), str(lck)
assert str(lck) == '<RWLock; Owner:Thread, Readers:[Thread], Pending:[], Writers Pending:[]>', str(lck)
lck.release()
assert str(lck) == '<RWLock; Owner:Thread, Readers:[], Pending:[], Writers Pending:[]>', str(lck)
lck.releaseWriter()
assert str(lck) == '<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>', str(lck)
# ex -> sh <- ex <- sh
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquireWriter(), str(lck)
assert str(lck) == "<RWLock; Owner:Thread, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquire(), str(lck)
assert str(lck) == "<RWLock; Owner:Thread, Readers:[Thread], Pending:[], Writers Pending:[]>", str(lck)
lck.releaseWriter()
assert str(lck) == "<RWLock; Owner:None, Readers:[Thread], Pending:[], Writers Pending:[]>", str(lck)
lck.release()
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
# sh -> ex <- ex <- sh
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquire(), str(lck)
assert str(lck) == "<RWLock; Owner:None, Readers:[Thread], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquireWriter(), str(lck)
assert str(lck) == "<RWLock; Owner:Thread, Readers:[Thread], Pending:[], Writers Pending:[]>", str(lck)
lck.releaseWriter()
assert str(lck) == "<RWLock; Owner:None, Readers:[Thread], Pending:[], Writers Pending:[]>", str(lck)
lck.release()
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
# sh -> ex <- sh <- ex
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquire(), str(lck)
assert str(lck) == "<RWLock; Owner:None, Readers:[Thread], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquireWriter(), str(lck)
assert str(lck) == "<RWLock; Owner:Thread, Readers:[Thread], Pending:[], Writers Pending:[]>", str(lck)
lck.release()
assert str(lck) == "<RWLock; Owner:Thread, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
lck.releaseWriter()
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert not deadlocks(upgrade, 2.0)
print "ok"
# timeout test
print "timeout test:",
# exclusive/exclusive timeout
lck = RWLock()
wlck = WithWriteLock( lck )
def f(evt):
evt.wait()
with wlck:
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
sleep(1.0)
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[], Writers Pending:[g]>", str(lck)
def g(evt):
evt.wait()
sleep(0.5)
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert not lck.acquireWriter(0.1)
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquireWriter(0.5)
assert str(lck) == "<RWLock; Owner:g, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
lck.releaseWriter()
threads(2, f, g)
print "ok,",
# shared/shared no timeout
lck = RWLock()
def f(evt):
evt.wait()
with lck:
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
sleep(1.0)
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
def g(evt):
evt.wait()
sleep(0.5)
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquire(0.1)
assert str(lck) == "<RWLock; Owner:None, Readers:[f,g], Pending:[], Writers Pending:[]>", str(lck)
lck.release()
threads(2, f, g)
print "ok,",
# exclusive/shared timeout
lck = RWLock()
wlck = WithWriteLock( lck )
def f(evt):
evt.wait()
with wlck:
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
sleep(1.0)
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[g], Writers Pending:[]>", str(lck)
def g(evt):
evt.wait()
sleep(0.5)
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert not lck.acquire(0.1)
assert str(lck) == "<RWLock; Owner:f, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquire(0.5)
assert str(lck) == "<RWLock; Owner:None, Readers:[g], Pending:[], Writers Pending:[]>", str(lck)
lck.release()
threads(2, f, g)
print "ok,",
# shared/exclusive timeout
lck = RWLock( True )
def f(evt):
evt.wait()
with lck:
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
sleep(1.0)
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[g]>", str(lck)
def g(evt):
evt.wait()
sleep(0.5)
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
assert not lck.acquireWriter(0.1)
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
assert lck.acquireWriter(0.5)
assert str(lck) == "<RWLock; Owner:g, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
lck.releaseWriter()
threads(2, f, g)
print "ok"
# different threads shared/exclusive upgrade test
print "different threads shared/exclusive upgrade test:",
lck = RWLock()
wlck = WithWriteLock( lck )
def f(evt):
evt.wait()
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
with lck:
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
sleep(3.0)
def g(evt):
evt.wait()
sleep(1.0)
assert str(lck) == "<RWLock; Owner:None, Readers:[f], Pending:[], Writers Pending:[]>", str(lck)
with lck:
assert str(lck) == "<RWLock; Owner:None, Readers:[f,g], Pending:[], Writers Pending:[]>", str(lck)
sleep(3.0)
assert str(lck) == "<RWLock; Owner:None, Readers:[g], Pending:[], Writers Pending:[h]>", str(lck)
with wlck:
assert str(lck) == "<RWLock; Owner:g, Readers:[g], Pending:[], Writers Pending:[h]>", str(lck)
def h(evt):
evt.wait()
sleep(2.0)
assert str(lck) == "<RWLock; Owner:None, Readers:[f,g], Pending:[], Writers Pending:[]>", str(lck)
with wlck:
assert str(lck) == "<RWLock; Owner:h, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
sleep(1.0)
threads(3, f, g, h)
print "ok"
# different threads exclusive/exclusive deadlock test
print "different threads exclusive/exclusive deadlock test:",
lck = RWLock()
def deadlock(evt):
lck.acquireWriter()
assert deadlocks(lambda: threads(2, deadlock), 2.0)
print "ok"
# different thread shared/exclusive deadlock test
print "different threads shared/exclusive deadlock test:",
lck = RWLock()
def deadlock1(evt):
lck.acquireWriter()
def deadlock2(evt):
lck.acquire()
assert deadlocks(lambda: threads(2, deadlock1, deadlock2), 2.0)
print "ok"
# different thread shared/shared deadlock test
print "different threads shared/shared no deadlock test:",
lck = RWLock( True )
def deadlock(evt):
lck.acquire()
assert not deadlocks(lambda: threads(2, deadlock), 2.0)
print "ok"
# exclusive interlock + timing test
print "exclusive interlock + serialized timing test:",
lck = RWLock( True )
wlck = WithWriteLock( lck )
val = 0
def exclusive(evt):
evt.wait()
global val
for i in range(10):
with wlck:
assert val == 0
val += 1
sleep(0.05 + random() * 0.05)
assert val == 1
val -= 1
sleep(0.05 + random() * 0.05)
assert val == 0
assert threads(4, exclusive) > 0.05 * 2 * 10 * 4
print "ok"
# shared non-interlock timing test
print "shared parallel timing test:",
lck = RWLock( True )
def shared(evt):
evt.wait()
for i in range(10):
with lck:
sleep(0.1)
assert threads(10, shared) < 0.1 * 10 + 4.0
print "ok"
# shared/exclusive test
print "multiple exclusive/shared threads busy loops:"
lck, shlck = RWLock(), Lock()
wlck = WithWriteLock( lck )
ex, sh, start, t = 0, 0, time(), 10.0
def exclusive(evt):
global ex, start, t
evt.wait()
i = 0
while i % 100 != 0 or start + t > time():
i += 1
lck.acquireWriter()
try:
ex += 1
finally:
lck.releaseWriter()
def shared(evt):
global sh, start, t
evt.wait()
i = 0
while i % 100 != 0 or start + t > time():
i += 1
lck.acquireWriter()
try:
with shlck:
sh += 1
finally:
lck.releaseWriter()
# even distribution
print "2wr/2rd:",
ex, sh, start = 0, 0, time()
assert 10.0 < threads(4, exclusive, exclusive, shared, shared) < 12.0
print "%d/%d:" % (ex, sh),
assert abs(log10(float(ex) / float(sh))) < 1.3
print "ok"
# exclusive starvation
print "1wr/3rd:",
ex, sh, start = 0, 0, time()
assert 10.0 < threads(4, exclusive, shared, shared, shared) < 12.0
print "%d/%d:" % (ex, sh),
assert abs(log10(float(ex) / float(sh))) < 1.3
print "ok"
# shared starvation
print "3wr/1rd:",
ex, sh, start = 0, 0, time()
assert 10.0 < threads(4, exclusive, exclusive, exclusive, shared) < 12.0
print "%d/%d:" % (ex, sh),
assert abs(log10(float(ex) / float(sh))) < 1.3
print "ok"
print "exhaustive timed (30-seconds) test - nonrecursive",
lck = RWLock()
start, t = time(), 30.0
def f( e ):
global start, t
e.wait()
lckCnt = 0L
lckBalance = 0L
while start + t > time():
# Create some locks!
j = randint(0, 1)
if j == 0:
jack = lck.acquireWriter( *(randint(0, 1) == 0 and (random()/4, ) or ()) )
else:
jack = lck.acquire( *(randint(0, 1) == 0 and (random()/4, ) or ()) )
sleep( random() * 0.005 )
# Release our lock
if jack:
lckBalance += 1L
lckCnt += 1L
if j == 0:
lck.releaseWriter()
lckBalance -= 1L
else:
lck.release()
lckBalance -= 1L
assert lckBalance == 0
f0 = lambda evt: f(evt);
f1 = lambda evt: f(evt);
f2 = lambda evt: f(evt);
f3 = lambda evt: f(evt);
f4 = lambda evt: f(evt);
f5 = lambda evt: f(evt);
f6 = lambda evt: f(evt);
f7 = lambda evt: f(evt);
f8 = lambda evt: f(evt);
f9 = lambda evt: f(evt);
threads( 10, f0, f1, f2, f3, f4, f5, f6, f7, f8, f9 )
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
print "ok"
print "exhaustive timed (30-seconds) test w/bias - nonrecursive",
lck = RWLock( True )
start, t = time(), 30.0
threads( 10, f0, f1, f2, f3, f4, f5, f6, f7, f8, f9 )
assert str(lck) == "<RWLock; Owner:None, Readers:[], Pending:[], Writers Pending:[]>", str(lck)
print "ok"
# heavy threading test
# specific anti-owners scenario (users cooperate by passing the lock
# to each other to make owner starve to death)
print "shareds cooperate in attempt to make exclusive starve to death:",
lck, shlck, hold = RWLock(), Lock(), 0
evtlock, stop = Event(), Event()
def user(evt):
evt.wait()
try:
while not stop.isSet():
with lck:
evtlock.set()
with shlck:
global hold
hold += 1
sleep(random() * 0.4)
waited = time()
while time() - waited < 3.0:
with shlck:
if hold > 1:
hold -= 1
break
if time() - waited >= 3.0: # but in turn they lock themselves
raise Exception("didn't work")
sleep(random() * 0.1)
except Exception, e:
assert str(e) == "didn't work"
def owner(evt):
evt.wait()
evtlock.wait()
lck.acquireWriter()
lck.releaseWriter()
stop.set()
assert not deadlocks(lambda: threads(5, owner, user, user, user, user), 10.0)
print "ok"
print "benchmark: writer",
lck, ii = RWLock(), 0
start = time()
while time() - start < 5.0:
for i in xrange(100):
lck.acquireWriter()
ii += 1
lck.releaseWriter()
print "%d empty lock/unlock cycles per second" % (ii / 5),
print "ok"
print "benchmark: writer w/wrapper",
lck, ii = RWLock(), 0
wlck = WithWriteLock( lck )
start = time()
while time() - start < 5.0:
for i in xrange(100):
with lck:
ii += 1
print "%d empty lock/unlock cycles per second" % (ii / 5),
print "ok"
print "benchmark: reader",
lck, ii = RWLock(), 0
start = time()
while time() - start < 5.0:
for i in xrange(100):
with lck:
ii += 1
print "%d empty lock/unlock cycles per second" % (ii / 5),
print "ok"
print "benchmark: read w/bias",
lck, ii = RWLock( True ), 0
start = time()
while time() - start < 5.0:
for i in xrange(100):
with lck:
ii += 1
print "%d empty lock/unlock cycles per second" % (ii / 5),
print "ok"
# all ok
print "all ok"
#

View File

@ -53,11 +53,6 @@ set( tools-external-libs
${SDLIMAGE_LIBRARY}
)
if(ENABLE_PYTHON AND PYTHON_LIBRARY)
include_directories( ${PYTHON_INCLUDE_PATH} )
set(game-external-libs ${game-external-libs} ${PYTHON_LIBRARIES})
endif(ENABLE_PYTHON AND PYTHON_LIBRARY)
if(ENABLE_FRIBIDI AND FRIBIDI_LIBRARIES)
include_directories( ${FRIBIDI_INCLUDE_PATH} )
set(game-external-libs ${game-external-libs} ${FRIBIDI_LIBRARIES})
@ -208,7 +203,6 @@ SET(wesnoth-main_SRC
ai_dfool.cpp
ai_attack.cpp
ai_move.cpp
ai_python.cpp
ai_village.cpp
animated_game.cpp
attack_prediction.cpp

View File

@ -34,7 +34,7 @@ pkgdatadir=$(datadir)/@DATADIR@
INTERNALLIBS = -lwesnoth-core -lwesnoth
THELIBS = -L. $(SDL_IMAGE_LIBS) $(SDL_MIXER_LIBS) $(SDL_NET_LIBS) \
$(SDL_TTF_LIBS) $(SDL_LIBS) $(PYTHON_LIBS) $(LIBINTL) \
$(SDL_TTF_LIBS) $(SDL_LIBS) $(LIBINTL) \
$(BOOST_IOSTREAMS_LIBS) $(BOOST_REGEX_LIBS)
wesnoth_source = \
@ -46,7 +46,6 @@ wesnoth_source = \
ai_dfool.cpp \
ai_attack.cpp \
ai_move.cpp \
ai_python.cpp \
ai_village.cpp \
animated_game.cpp \
attack_prediction.cpp \
@ -199,7 +198,7 @@ endif
wesnoth_SOURCES = \
game.cpp \
$(wesnoth_source)
wesnoth_LDADD = $(INTERNALLIBS) $(THELIBS) $(PANGO_LIBS) $(FONTCONFIG_LIBS)
wesnoth_DEPENDENCIES=libwesnoth-core.a libwesnoth.a
@ -229,7 +228,7 @@ wesnothd_SOURCES = \
time.cpp \
loadscreen_empty.cpp
wesnothd_LDADD = -L. -lwesnoth-core $(BOOST_IOSTREAMS_LIBS) @SDL_NET_LIBS@ @SDL_LIBS@ $(LIBINTL)
wesnothd_DEPENDENCIES=libwesnoth-core.a
@ -414,11 +413,6 @@ AM_CXXFLAGS = -DHAVE_REVISION -I../intl -I$(top_srcdir)/intl @SDL_CFLAGS@ -DWES
AM_CFLAGS = -DHAVE_REVISION -I../intl -I$(top_srcdir)/intl @SDL_CFLAGS@ -DWESNOTH_PATH=\"$(pkgdatadir)\" \
-DLOCALEDIR=\"$(LOCALEDIR)\" -DHAS_RELATIVE_LOCALEDIR=$(HAS_RELATIVE_LOCALEDIR)
if PYTHON
AM_CXXFLAGS += @PYTHON_CFLAGS@
AM_CFLAGS += @PYTHON_CFLAGS@
endif
if FRIBIDI
AM_CXXFLAGS += -DHAVE_FRIBIDI @FRIBIDI_CFLAGS@
AM_CFLAGS += -DHAVE_FRIBIDI @FRIBIDI_CFLAGS@

View File

@ -257,11 +257,7 @@ wesnoth_sources = Split("""
""")
wesnoth_sources.extend(env.Object("game_preferences_display.cpp", EXTRA_DEFINE = env["PLATFORM"] != "win32" and "WESNOTH_PREFIX='\"$prefix\"'" or None))
python_env = env.Clone()
if env['python']:
python_env.Append(CPPDEFINES = "HAVE_PYTHON")
wesnoth_sources.extend(python_env.Object(Split("""
ai_python.cpp
wesnoth_sources.extend(env.Object(Split("""
ai.cpp
config_cache.cpp
multiplayer_connect.cpp
@ -308,7 +304,7 @@ def WesnothProgram(env, target, source, can_build, **kw):
from SCons.Script.SConscript import SConsEnvironment
SConsEnvironment.WesnothProgram = WesnothProgram
game_cpp = python_env.Object("game.cpp", EXTRA_DEFINE = not env["pool_alloc"] and "DISABLE_POOL_ALLOC" or None);
game_cpp = env.Object("game.cpp", EXTRA_DEFINE = not env["pool_alloc"] and "DISABLE_POOL_ALLOC" or None);
env.WesnothProgram("wesnoth", [game_cpp] + [libwesnoth_extras, libwesnoth_core, libwesnoth_sdl, libwesnoth, libcampaignd, env["wesnoth_res"]], have_client_prereqs)
@ -378,7 +374,7 @@ test_sources = Split("""
tests/gui/test_save_dialog.cpp
tests/utils/play_scenario.cpp
""")
test_sources.extend(test_env.Object("tests/test_config_cache.cpp", EXTRA_DEFINE = env['python'] and "HAVE_PYTHON" or None))
test_sources.extend(test_env.Object("tests/test_config_cache.cpp"))
test = test_env.WesnothProgram("test", test_sources + [libwesnoth_extras, libwesnoth_core, libwesnoth_sdl, libwesnoth,libtest_utils], have_test_prereqs)

View File

@ -19,10 +19,6 @@
#include "ai2.hpp"
#include "ai_dfool.hpp"
#ifdef HAVE_PYTHON
//#include "python_ai.hpp"
#include "ai_python.hpp"
#endif
#include "array.hpp"
#include "dialogs.hpp"
#include "formula_ai.hpp"
@ -184,40 +180,6 @@ protected:
};
#ifdef HAVE_PYTHON
// Finds all python AI scripts available in the current binary path.
// They have to end with .py, and have #!WPY as first line.
// If preferences allow for unsafe python AIs, then also look for
// the #!UNSAFE_WPY tag.
static std::vector<std::string> get_available_py_scripts()
{
int allow_unsafe = !preferences::run_safe_python() ;
std::vector<std::string> scripts;
const std::vector<std::string>& paths = get_binary_paths("data");
for(std::vector<std::string>::const_iterator i = paths.begin(); i != paths.end(); ++i) {
std::vector<std::string> files;
get_files_in_dir(*i + "ai/python", &files, NULL, ENTIRE_FILE_PATH);
for(std::vector<std::string>::const_iterator j = files.begin(); j != files.end(); ++j) {
// file ends with .py
if (j->substr(j->length() - 3) == ".py") {
std::string name(j->substr(j->rfind("/") + 1)); // extract name
// read first line
std::ifstream s(j->c_str()); std::string mark; s >> mark; s.close();
if (mark == "#!WPY" &&
std::find(scripts.begin(), scripts.end(), name) == scripts.end())
scripts.push_back(name);
else if (allow_unsafe && mark == "#!UNSAFE_WPY" &&
std::find(scripts.begin(), scripts.end(), name) == scripts.end())
scripts.push_back(name);
}
}
}
return scripts;
}
#endif
std::vector<std::string> get_available_ais()
{
std::vector<std::string> ais;
@ -225,10 +187,6 @@ std::vector<std::string> get_available_ais()
ais.push_back("sample_ai");
//ais.push_back("idle_ai");
ais.push_back("dfool_ai");
#ifdef HAVE_PYTHON
std::vector<std::string> scripts = get_available_py_scripts();
ais.insert(ais.end(), scripts.begin(), scripts.end());
#endif
return ais;
}
@ -251,17 +209,6 @@ ai_interface* create_ai(const std::string& name, ai_interface::info& info)
// return new advanced_ai(info);
else if(name == "ai2")
return new ai2(info);
else if(name == "python_ai")
#ifdef HAVE_PYTHON
return new python_ai(info);
// else if(name == "newpy_ai")
// return new pyai::PythonAI( info ) ;
#else
{
ERR_AI << "No Python AI support available in this Wesnoth build! Using the default AI instead.\n";
return new ai(info);
}
#endif
else if(name != "" && name != "default") {
ERR_AI << "AI not found: '" << name << "'. Using default instead.\n";
}

File diff suppressed because it is too large Load Diff

View File

@ -1,102 +0,0 @@
/* $Id$ */
/*
Copyright (C) 2007 - 2009
Part of the Battle for Wesnoth Project http://www.wesnoth.org/
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2
or at your option any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY.
See the COPYING file for more details.
*/
/** @file ai_python.hpp */
#ifndef AI_PYTHON_HPP_INCLUDED
#define AI_PYTHON_HPP_INCLUDED
#include "ai_interface.hpp"
#include "game_end_exceptions.hpp"
#undef _POSIX_C_SOURCE // avoids a spurious compiler warning
#include <Python.h>
typedef struct {
PyObject_HEAD
const unit_type* unit_type_;
} wesnoth_unittype;
typedef struct {
PyObject_HEAD
const team* team_;
} wesnoth_team;
typedef struct {
PyObject_HEAD
const unit* unit_;
} wesnoth_unit;
#define W(name) static PyObject *wrapper_##name(PyObject* self, PyObject* args)
class python_ai : public ai_interface
{
public:
python_ai(ai_interface::info& info);
virtual ~python_ai();
virtual void play_turn();
static PyObject* wrapper_unit_movement_cost(wesnoth_unit*, PyObject* args);
static PyObject* wrapper_unit_defense_modifier(wesnoth_unit*, PyObject* args);
static PyObject* wrapper_unittype_movement_cost(wesnoth_unittype*, PyObject* args);
static PyObject* wrapper_unittype_defense_modifier(wesnoth_unittype*, PyObject* args);
W(team_targets);
W(get_units);
W(log_message);
W(log);
W(get_location);
W(get_map);
W(get_teams);
W(get_current_team);
W(get_src_dst);
W(get_dst_src);
W(get_enemy_src_dst);
W(get_enemy_dst_src);
W(move_unit);
W(attack_unit);
W(get_adjacent_tiles);
W(recruit_unit);
W(get_gamestatus);
W(set_variable);
W(get_variable);
W(get_version);
W(raise_user_interact);
W(test_move);
W(get_random);
static PyObject* unittype_advances_to( wesnoth_unittype* type, PyObject* args );
static PyObject* wrapper_team_recruits( wesnoth_team* team, PyObject* args );
static PyObject* wrapper_unit_find_path( wesnoth_unit* unit, PyObject* args );
static PyObject* wrapper_unit_attack_statistics(wesnoth_unit* unit, PyObject* args);
static bool is_unit_valid(const unit* unit);
std::vector<team>& get_teams() { return get_info().teams; }
static void initialize_python();
static void invoke(std::string name);
static int run_shell();
friend void recalculate_movemaps();
private:
static bool init_;
end_level_exception exception;
ai_interface::move_map src_dst_;
ai_interface::move_map dst_src_;
std::map<location,paths> possible_moves_;
ai_interface::move_map enemy_src_dst_;
ai_interface::move_map enemy_dst_src_;
std::map<location,paths> enemy_possible_moves_;
};
#undef W
#endif

View File

@ -77,10 +77,6 @@ namespace game_config {
if (game_config::small_gui)
defines_map_["SMALL_GUI"] = preproc_define();
#ifdef HAVE_PYTHON
defines_map_["PYTHON"] = preproc_define();
#endif
#if defined(__APPLE__)
defines_map_["APPLE"] = preproc_define();
#endif

View File

@ -73,10 +73,6 @@
//#include "locale.h"
//#endif
#ifdef HAVE_PYTHON
#include "ai_python.hpp"
#endif
#ifndef DISABLE_EDITOR2
#include "editor2/editor_main.hpp"
#endif
@ -1808,12 +1804,6 @@ static int process_command_args(int argc, char** argv) {
<< " --nomusic runs the game without music.\n"
<< " --nosound runs the game without sounds and music.\n"
<< " --path prints the path to the data directory and exits.\n"
#ifdef HAVE_PYTHON
<< " --python-api prints the runtime documentation for the python\n"
<< " API.\n"
<< " --python-shell invokes wesnoth's embedded python interpreter in\n"
<< " interactive mode.\n"
#endif
<< " -r, --resolution XxY sets the screen resolution. Example: -r 800x600\n"
<< " --smallgui allows to use screen resolutions down to 800x480\n"
<< " and resizes a few interface elements.\n"
@ -1843,18 +1833,6 @@ static int process_command_args(int argc, char** argv) {
std::cout << game_config::path
<< "\n";
return 0;
#ifdef HAVE_PYTHON
} else if(val == "--python-api") {
python_ai::invoke("documentation");
return 0;
} else if(val == "--python-shell") {
int ret = python_ai::run_shell();
if (ret==0) {
return 0;
} else {
return 2;
}
#endif
} else if(val == "--config-dir") {
if (argc <= ++arg)
break;

View File

@ -657,17 +657,8 @@ config connect::side::get_config() const
res["id"] = res["save_id"];
config *ai = res.child("ai");
if (!ai) ai = &res.add_child("ai");
#ifdef HAVE_PYTHON
if (ai_algorithm_.substr(ai_algorithm_.length() - 3) == ".py") {
(*ai)["ai_algorithm"] = "python_ai";
(*ai)["python_script"] = ai_algorithm_;
}
else
#endif
{
if (ai_algorithm_ != "default")
(*ai)["ai_algorithm"] = ai_algorithm_;
}
if (ai_algorithm_ != "default")
(*ai)["ai_algorithm"] = ai_algorithm_;
}
description = N_("Computer player");
break;

View File

@ -10,11 +10,6 @@
* before and after each release.
*/
// We are building with scons, so Python cannot be absent.
// this definition has to be done somewhere else or the normal builds
// via autotools are broken, which is ATM not acceptable
//#define HAVE_PYTHON
// without this ifdef DUMMYLOCALES break, so leave it in even though is seems
// to not have any real purpose...
#ifdef HAVE_CONFIG_H

View File

@ -73,13 +73,10 @@ env = Environment()
env["CC"] = "i586-mingw32msvc-gcc"
env["CXX"] = "i586-mingw32msvc-g++"
# Dependencies: SDL, SDL_net, SDL_mixer, SDL_image, freetype, libintl, python
# Dependencies: SDL, SDL_net, SDL_mixer, SDL_image, freetype, libintl
# This is where I put the dependency headers
env.Append(CPPPATH = ["../win-deps/include"])
# This is where I put the python headers.
env.Append(CPPPATH = ["../win-deps/include/python24"])
# This is where I put the dependency libs
env.Append(LIBPATH = ["../win-deps/lib"])
@ -88,7 +85,7 @@ env.Append(CCFLAGS = ["-O2", "-mthreads", "-DHAVE_PYTHON"])
env.Append(LINKFLAGS = ["-s", "-mwindows", "-lmingwthrd"])
env.Append(CPPPATH = ["src", "src/widgets"])
env.Append(LIBS = ["mingw32", "SDLmain", "SDL", "SDL_net", "SDL_mixer", "SDL_image",
"intl", "wsock32", "freetype", "python24", "boost-iostreams", "z"])
"intl", "wsock32", "freetype", "boost-iostreams", "z"])
# Scons stuff
env.BuildDir("../build", "src")