###################################################################################
# MIT License
# Copyright (c) 2015-2025 Gregory Paradis
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
###################################################################################
"""
This module implements functions for building and running the wood supply simulation
models.
The ``ForestModel`` and ``DevelopmentType`` classes constitute the core functional units of this module, and of the ``ws3`` package in general.
"""
import math
import sys
import re
import copy
import operator
import random
import itertools
from itertools import chain
from functools import reduce
_cfi = chain.from_iterable
from collections import defaultdict as dd
import pandas as pd
#from numba import njit
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from multiprocessing import get_context
import dill
from collections import defaultdict
try:
from ws3 import common
from ws3 import core
from ws3 import opt
except: # "__main__" case
from ws3 import common
from ws3 import core
from ws3 import opt
from ws3.common import timed
from pdb import set_trace
import types
import functools
from itertools import islice
from ws3.forest_helper import MP_CONTEXT, _GLOBAL_MODEL_GEN_VARS, _GLOBAL_COEFF_FUNCS_GEN_VARS, _GLOBAL_WORKERS_GEN_VARS
from ws3.forest_helper import choose_max_batch_factor, auto_batch, worker_summarize_tree_batch, sanitize_func
from ws3.forest_helper import init_worker_gen_vars, worker_gen_vars
from ws3.forest_helper import worker_cmp_cflw_batch, worker_cmp_cflw_phase3, worker_cmp_cflw_phase3_batch
from ws3.forest_helper import worker_cmp_cgen_batch, worker_cmp_cgen_phase3, worker_cmp_cgen_phase3_batch
from ws3.forest_helper import PersistentWorkerPool
[docs]
class GreedyAreaSelector:
"""
Default AreaSelector implementation. Selects areas for treatment from oldest age classes.
"""
def __init__(self, parent):
self.parent = parent
[docs]
def operate(self, period, acode, target_area, mask=None,
commit_actions=True, verbose=False):
"""
Greedily operate on oldest operable age classes.
Returns missing area (i.e., difference between target and operated areas).
:param int period: The time period for the operation.
:param str acode: The action code to specify the action.
:param float target_area: The desired area to be achieved through operation.
:param tuple mask: Tuple of values for development types.
:param bool commit_actions: Flag indicating whether to commit actions. Defaults to True.
:param bool verbose: Verbosity flag. Defaults to False.
"""
key = lambda item: max(item[1])
odt = sorted(list(self.parent.operable_dtypes(acode, period, mask).items()), key=key)
if verbose:
print(' entering selector.operate()', len(odt), 'operable dtypes')
while target_area > 0 and odt:
while target_area > 0 and odt:
popped = odt.pop()
try:
dtk, ages = popped #odt.pop()
except:
print(odt)
print(popped)
raise
age = sorted(ages)[-1]
oa = self.parent.dtypes[dtk].operable_area(acode, period, age)
if not oa: continue # nothing to operate
area = min(oa, target_area)
target_area -= area
if area < 0:
print('negative area', area, oa, target_area, acode, period, age)
assert False
if verbose:
print(' selector found area', [' '.join(dtk)], acode, period, age, area)
self.parent.apply_action(dtk, acode, period, age, area, compile_c_ycomps=True,
fuzzy_age=False, recourse_enabled=False, verbose=verbose)
odt = sorted(list(self.parent.operable_dtypes(acode, period, mask).items()), key=key)
self.parent.commit_actions(period, repair_future_actions=True)
if verbose:
print('GreedyAreaSelector.operate done (remaining target_area: %0.1f)' % target_area)
return target_area
[docs]
class Action:
"""
Encapsulates data for an action.
"""
def __init__(self,
code,
targetage=None,
descr='',
lockexempt=False,
components=None,
partial=None,
is_harvest=0,
is_sticky=0):
self.code = code
self.targetage = targetage
self.descr = descr
self.lockexempt = lockexempt
self.oper_a = None
self.oper_p = None
self.components = components or []
self.partial = partial or []
self.is_compiled = False
self.is_harvest = is_harvest
self.is_sticky = is_sticky
self.treatment_type = None
[docs]
class DevelopmentType:
"""
Encapsulates Forest development type data (curves, age, area), and provides methods to operate on the data.
"""
_bo = {'AND':operator.and_, '&':operator.and_, 'OR':operator.or_, '|':operator.or_}
def __init__(self,
key,
parent):
"""
The key is basically the fully expanded mask (expressed as a tuple of values).
The parent is a reference to the ForestModel object in which self is embedded.
"""
self.key = key
self.parent = parent
self._rc = parent.register_curve # shorthand
self._max_age = parent.max_age
self._ycomps = {}
self._complex_ycomps = {}
self._zero_curve = parent.common_curves['zero']
self._unit_curve = parent.common_curves['unit']
self._ages_curve = parent.common_curves['ages']
self._resolvers = {'MULTIPLY':self._resolver_multiply,
'DIVIDE':self._resolver_divide,
'SUM':self._resolver_sum,
'CAI':self._resolver_cai,
'MAI':self._resolver_mai,
'YTP':self._resolver_ytp,
'RANGE':self._resolver_range}
self.transitions = {} # keys are (acode, age) tuples
#######################################################################
# Use period 0 slot to store starting inventory.
self._areas = {p:dd(float) for p in range(0, self.parent.horizon+1)}
#######################################################################
self.oper_expr = dd(list)
self.operability = {}
[docs]
def operable_ages(self, acode, period):
"""
Finds list of ages at which self is operable, given an action code and period index.
"""
if acode not in self.oper_expr: # action not defined for this development type
return None
if acode not in self.operability: # action not compiled yet...
if self.compile_action(acode) == -1: return None # never operable
if period not in self.operability[acode]:
return None
else:
lo, hi = self.operability[acode][period]
return list(set(range(lo, hi+1)).intersection(list(self._areas[period].keys())))
[docs]
def is_operable(self, acode, period, age=None, verbose=False):
"""
Test hypothetical operability.
Does not imply that there is any operable area in current inventory.
:param str acode: The action code to test operability for.
:param int period: The period to test operability for.
:param int age: The age to test operability for. If None, only checks operability for the period.
:param bool verbose: If True, prints additional information for debugging purposes. Default is False.
"""
if acode not in self.oper_expr: # action not defined for this development type
if verbose: print('acode operability undefined', acode, self.oper_expr)
return False
if acode not in self.operability: # action not compiled yet...
if self.compile_action(acode) == -1:
if verbose: print('never operable', acode)
return False # never operable
if period not in self.operability[acode]:
return False
else:
lo, hi = self.operability[acode][period]
if age is not None:
return age >= lo and age <= hi
else:
return lo, hi
[docs]
def operable_area(self, acode, period, age=None, cleanup=True):
"""
Returns 0 if inoperable or no current inventory, operable area given action code and period
(and optionally age) index otherwise.
:param str acode: The action code to determine operability.
:param int period: The period to determine operability for.
:param int age: The age to determine operability for. If None, only checks operability for the period.
:param bool cleanup: If True (default), removes the age class from the inventory dict if operable area is less than self.parent.area_epsilon.
"""
if acode not in self.oper_expr: # action not defined for this development type
return 0.
if acode not in self.operability: # action not xf yet...
if self.compile_action(acode) == -1: return 0. # never operable
if age is None: # return total operable area
return sum(self.operable_area(acode, period, a) for a in list(self._areas[period].keys()))
if age not in self._areas[period]: # age class not in inventory
return 0.
elif abs(self._areas[period][age]) < self.parent.area_epsilon: # negligible area
if cleanup: # remove ageclass from dict (frees up memory)
del self._areas[period][age]
return 0.
elif self.is_operable(acode, period, age):
return self._areas[period][age]
else:
return 0.
assert False
[docs]
def area(self, period, age=None, area=None, delta=True):
"""
If area not specified, returns area inventory for period (optionally age), else sets area for period and age.
If delta switch active (default True), area value is interpreted as an increment on current inventory.
:param int period: The period for which the area is being retrieved or set.
:param int age: The age for which the area is being retrieved or set. If None, returns total area.
:param float area: The area value to set. If None, returns the area inventory.
:param bool delta: If True (default), interprets the area value as an increment on the current inventory.
If False, sets the area value directly.
"""
if area is None: # return area for period and age
if age is not None:
try:
return self._areas[period][age]
except Exception as e:
print(e)
return 0.
else: # return total area
return sum(self._areas[period][a] for a in self._areas[period])
else:
if delta:
self._areas[period][age] += area
else:
self._areas[period][age] = area
[docs]
def resolve_condition(self, yname, lo, hi):
"""
Find lower and upper ages that correspond to lo and hi values of yname (interpreted as first occurence of yield value, reading curve from left and right, respectively).
"""
return [x for x, y in enumerate(self.ycomp(yname)) if y >= lo and y <= hi]
[docs]
def reset_areas(self, period=None):
"""
Reset areas dictionary.
"""
periods = self.parent.periods if period is None else [period]
for period in periods:
self._areas[period] = dd(float)
[docs]
def ycomps(self):
"""
Returns list of yield component keys.
"""
return list(self._ycomps.keys())
[docs]
def ycomp(self, yname, silent_fail=True):
"""
Returns the yield components associated with the given yield name. Returns None if the yield name is not found and silent_fail is True.
:param str yname: The name of the yield to retrieve components for.
:param bool silent_fail: If True (default), returns None if the yield name is not found. If False, raises a KeyError that yield name is not found.
"""
if yname in self._ycomps:
if not self._ycomps[yname]: # complex ycomp not compiled yet
self._compile_complex_ycomp(yname)
return self._ycomps[yname]
else: # not a valid yname
if silent_fail:
return None
else:
raise KeyError("ycomp '%s' not in development type '%s'" % (yname, ' '.join(self.key)))
def _o(self, s, default_ycomp=None): # resolve string operands
if not default_ycomp: default_ycomp = self._zero_curve
if common.is_num(s):
return float(s)
elif s.startswith('#'):
return self.parent.constants[s[1:]]
else:
s = s.lower() # just to be safe
ycomp = self.ycomp(s)
return ycomp if ycomp else default_ycomp
def _resolver_multiply(self, yname, d):
args = [self._o(s.lower()) for s in re.split(r'\s?,\s?', re.search(r'(?<=\().*(?=\))', d).group(0))]
##################################################################################################
# NOTE: Not consistent with Remsoft documentation on 'complex-compound yields' (fix me)...
ytype_set = set(a.type for a in args if isinstance(a, core.Curve))
return ytype_set.pop() if len(ytype_set) == 1 else 'c', self._rc(reduce(lambda x, y: x*y, args))
##################################################################################################
def _resolver_divide(self, yname, d):
_tmp = list(zip(re.split(r'\s?,\s?', re.search(r'(?<=\().*(?=\))', d).group(0)),
(self._zero_curve, self._unit_curve)))
args = [self._o(s, default_ycomp) for s, default_ycomp in _tmp]
return args[0].type if not args[0].is_special else args[1].type, self._rc(args[0] / args[1])
def _resolver_sum(self, yname, d):
args = [self._o(s.lower()) for s in re.split(r'\s?,\s?', re.search(r'(?<=\().*(?=\))', d).group(0))]
ytype_set = set(a.type for a in args if isinstance(a, core.Curve))
return ytype_set.pop() if len(ytype_set) == 1 else 'c', self._rc(reduce(lambda x, y: x+y, [a for a in args]))
def _resolver_cai(self, yname, d):
arg = self._o(re.split(r'\s?,\s?', re.search(r'(?<=\().*(?=\))', d).group(0))[0])
return arg.type, self._rc(arg.mai())
def _resolver_mai(self, yname, d):
arg = self._o(re.split(r'\s?,\s?', re.search(r'(?<=\().*(?=\))', d).group(0))[0])
return arg.type, self._rc(arg.mai())
def _resolver_ytp(self, yname, d):
arg = self._o(re.search(r'(?<=\().*(?=\))', d).group(0).lower())
return arg.type, self._rc(arg.ytp())
def _resolver_range(self, yname, d):
args = [self._o(s.lower()) for s in re.split(r'\s?,\s?', re.search(r'(?<=\().*(?=\))', d).group(0))]
arg_triplets = [args[i:i+3] for i in range(0, len(args), 3)]
range_curve = self._rc(reduce(lambda x, y: x*y, [t[0].range(t[1], t[2]) for t in arg_triplets]))
return args[0].type, self._rc(reduce(lambda x, y: x*y, [t[0].range(t[1], t[2]) for t in arg_triplets]))
def _compile_complex_ycomp(self, yname):
expression = self._complex_ycomps[yname]
keyword = re.search(r'(?<=_)[A-Z]+(?=\()', expression).group(0)
try:
ytype, ycomp = self._resolvers[keyword](yname, expression)
ycomp.label = yname
ycomp.type = ytype
self._ycomps[yname] = ycomp
except KeyError:
raise ValueError('Problem compiling complex yield: %s, %s' % (yname, expression))
[docs]
def compile_actions(self, verbose=False):
"""
Compile all actions.
:param bool verbose: Verbosity flag. Defaults to False.
"""
for acode in self.oper_expr:
self.compile_action(acode, verbose)
[docs]
def compile_action(self, acode, verbose=False):
"""
Compile action, given action code.
This mostly involves resolving operability expression strings into
lower and upper operability limits, defined as (alo, ahi) age pair for each period.
Deletes action from self if not operable in any period.
:param str acode: The action code.
:param bool verbose: Verbosity flag. Defaults to False.
"""
self.operability[acode] = {}
for expr in self.oper_expr[acode]:
self._compile_oper_expr(acode, expr, verbose)
is_operable = False
for p in self.operability[acode]:
if self.operability[acode][p] is not None:
is_operable = True
if not is_operable:
if verbose: print('not operable (deleting):', acode)
del self.operability[acode]
del self.oper_expr[acode]
return -1
else:
if verbose: print('operable:', acode) #, self.operability[acode]
return 0
def _compile_oper_expr(self, acode, expr, verbose=False):
expr = expr.replace('&', 'and').replace('|', 'or')
oper = None
plo, phi = 1, self.parent.horizon # count periods from 1, as in Forest...
alo, ahi = 0, self._max_age
if 'and' in expr:
oper = 'and'
elif 'or' in expr:
oper = 'or'
alo, ahi = self._max_age+1, -1
cond_comps = expr.split(' %s ' % oper)
lhs, rel_operators, rhs = list(zip(*[cc.split(' ') for cc in cond_comps]))
rhs = list(map(float, rhs))
_plo, _phi, _alo, _ahi = None, None, None, None
for i, o in enumerate(lhs):
if o == '_cp':
period = int(rhs[i])
assert period <= self.parent.horizon # sanity check
#################################################################
# Nonsense to relate time-based and age-based conditions with OR?
# Recondider if this actually ever comes up...
assert oper != 'or'
#################################################################
if rel_operators[i] == '=':
_plo, _phi = period, period
elif rel_operators[i] == '>=':
_plo = period
elif rel_opertors[i] == '<=':
_phi = period
else:
raise ValueError('Bad relational operator.')
plo, phi = max(_plo, plo), min(_phi, phi)
elif o == '_age':
age = int(rhs[i])
if rel_operators[i] == '=':
_alo, _ahi = age, age
elif rel_operators[i] == '>=':
_alo = age
elif rel_operators[i] == '<=':
_ahi = age
else:
raise ValueError('Bad relational operator.')
else: # must be yname
ycomp = self.ycomp(o)
if rel_operators[i] == '=':
_alo = _ahi = ycomp.lookup(rhs[i])
elif rel_operators[i] == '>=':
_alo = ycomp.lookup(rhs[i])
elif rel_operators[i] == '<=':
_ahi = ycomp.lookup(rhs[i])
else:
raise ValueError('Bad relational operator.')
if oper == 'and' or not oper:
if _alo is not None: alo = max(_alo, alo)
if _ahi is not None: ahi = min(_ahi, ahi)
else: # or
if _alo is not None: alo = min(_alo, alo)
if _ahi is not None: ahi = max(_ahi, ahi)
assert plo <= phi # should never explicitly declare infeasible period range...
for p in range(plo, phi+1):
assert alo <= ahi
self.operability[acode][p] = (alo, ahi) if alo <= ahi else None
[docs]
def add_ycomp(self, ytype, yname, ycomp, first_match=True):
"""
Adds a yield component to the yield components.
:param str ytype: The type of yield component to add ('c' for complex).
:param str yname: The name of the yield component.
:param str ycomp: The yield component to add.
:param bool first_match: Flag indicating whether to only add the component if it doesn't already exist. Defaults to True.
"""
if first_match and yname in self._ycomps: return # already exists (reject)
if ytype == 'c':
self._complex_ycomps[yname] = ycomp
self._ycomps[yname] = None
if isinstance(ycomp, core.Curve):
self._ycomps[yname] = ycomp
[docs]
def grow(self, start_period=1, cascade=True):
"""
Grow self (default starting period 1, and cascading to end of planning horizon).
:param int start_period: The starting period for growth (default is 1).
:param bool cascade: If True, growth cascades to the end of the planning horizon. Default is True.
"""
end_period = start_period + 1 if not cascade else self.parent.horizon
for p in range(start_period, end_period):
self.reset_areas(p+1) #, self._areas[p], self._areas[p+1] # WTF?
#for age, area in list(self._areas[p].items()): self._areas[p+1][age+1] = area
for age, area in list(self._areas[p].items()): self._areas[p+1][age+self.parent.period_length] = area
[docs]
def overwrite_initial_areas(self, period):
"""
Overwrites the initial areas for a specified period.
"""
self._areas[0] = copy.copy(self._areas[period])
self.initialize_areas()
[docs]
def initialize_areas(self):
"""
Copy initial inventory to period-1 inventory.
"""
self._areas[1] = copy.copy(self._areas[0])
[docs]
class Output:
"""
Encapsulates data and methods to operate on aggregate outputs from the model.
Emulates behaviour of Forest outputs.
.. warning::
Behaviour of Forest outputs is quite complex. This class needs more work before it is used in a production setting (i.e., resolution of some complex output cases is buggy).
"""
def __init__(self,
parent,
code=None,
expression=None,
factor=(1., 1),
description='',
theme_index=-1,
is_basic=False,
is_level=False):
self.parent = parent
self.code = code
self.expression = expression
self._factor = factor
self.description = description
self.theme_index = theme_index
self.is_themed = True if theme_index > -1 else False
self.is_basic = is_basic
if is_basic:
self._compile_basic(expression) # shortcut
elif not is_level:
self._compile(expression) # will detect is_basic
self.is_level = is_level
def _lval(self, s):
"""
Resolve left operand in sub-expression.
"""
if s.lower() in self.parent.outputs:
return self.parent.outputs[s.lower()]
else: # expression
return s.lower()
def _rval(self, s):
"""
Resolve right operand in sub-expression.
"""
if common.is_num(s):
return float(s)
elif s.startswith('#'):
return self.parent.constants[s[1:].lower()]
else: # time-based ycomp code
return s.lower()
def _compile(self, expression):
"""
Resolve operands in expression to the extent possible.
Can be basic or summary.
Assuming operand pattern:
lval_1 [*|/ rval_1] +|- .. +|- lval_n [*|/ rval_n]
where
lval := ocode or expression
rval := number or #constant or ycomp
"""
t = re.split(r'\s+(\+|-)\s+', expression)
ocomps = t[::2] # output component sub-expressions
signs = [1.] # implied + in front of expression
signs.extend(1. if s == '+' else -1 for s in t[1::2])
factors = [(1., 1) for i in ocomps]
for i, s in enumerate(ocomps):
tt = re.split(r'\s+(\*|/)\s+', s) # split on */ operator
lval = self._lval(tt[0])
if len(tt) > 1:
factors[i] = self._rval(tt[2]), 1 if tt[1] == '*' else -1
if not isinstance(lval, Output):
if len(ocomps) == 1: # simple basic output (special case)
self.is_basic = True
self._factor = factors[0]
self._compile_basic(lval)
return
else: # compound basic output
ocomps[i] = Output(parent=self.parent,
expression=lval,
factor=factors[i],
is_basic=True)
else: # summary output
ocomps[i] = lval #self.parent.outputs[lval]
self._ocomps = ocomps
self._signs = signs
self._factors = factors
def _compile_basic(self, expression):
# clean up (makes parsing easier)
s = re.sub(r'\s+', ' ', expression) # separate tokens by single space
s = s.replace(' (', '(') # remove space to left of left parentheses
t = s.lower().split(' ')
# filter dtypes, if starts with mask
mask = None
if not (t[0] == '@' or t[0] == '_' or t[0] in self.parent.actions):
mask = tuple(t[:self.parent.nthemes()])
t = t[self.parent.nthemes():] # pop
# extract @AGE or @YLD condition, if present
self._ages = None
self._condition = None
if t[0].startswith('@age'):
lo, hi = [int(a)+i for i, a in enumerate(t[0][5:-1].split('..'))]
hi = min(hi, self.parent.max_age+1) # they get carried away with range bounds...
self._ages = list(range(lo, hi))
t = t[1:] # pop
elif t[0].startswith('@yld'):
ycomp, args = t[0][5:-1].split(',')
self._condition = tuple([ycomp] + [float(a) for a in args.split('..')])
self._ages = None
t = t[1:] # pop
if not self._ages and not self._condition: self._ages = self.parent.ages
# extract _INVENT or acode
if t[0].startswith('_'): # _INVENT
self._is_invent = True
self._invent_acodes = t[0][8:-1].split(',') if len(t[0]) > 7 else None
self._acode = None
else: # acode
self._is_invent = False
self._invent_acodes = None
self._acode = t[0]
t = t[1:] # pop
# extract _AREA or ycomp
if t[0].startswith('_'): # _AREA
self._is_area = True
self._ycomp = None
else: # acode
self._is_area = False
self._ycomp = t[0]
t = t[1:] # pop
def _evaluate_basic(self, period, factors, verbose=0, cut_corners=True):
result = 0.
if self._invent_acodes:
acodes = [acode for acode in self._invent_acodes if parent.applied_actions[period][acode]]
if cut_corners and not acodes:
return 0. # area will be 0...
for k in list(self.parent.dtypes.keys()):
dt = self.parent.dtypes[k]
if cut_corners and not self._is_invent and k not in self.parent.applied_actions[period][self._acode]:
if verbose: print('bailing on', period, self._acode, ' '.join(k))
continue # area will be 0...
if isinstance(self._factor[0], float):
f = pow(*self._factor)
else:
f = pow(dt.ycomp(self._factor[0])[period], self._factor[1])
for factor in factors:
if isinstance(factor[0], float):
f *= pow(*factor)
else:
f *= pow(dt.ycomp(factor[0])[period], factor[0])
if cut_corners and not f:
if verbose: print('f is null', f)
continue # one of the factors is 0, no point calculating area...
ages = self._ages if not self._condition else dt.resolve_condition(*self._condition)
for age in ages:
area = 0.
if self._is_invent:
if cut_corners and not dt.area(period, age):
continue
if self._invent_acodes:
any_operable = False
for acode in acodes:
if acode not in dt.operability: continue
if dt.is_operable(acode, period, age):
any_operable = True
if any_operable:
area += dt.area(period, age)
else:
area += dt.area(period, age)
else:
assert False # not implemented yet...
y = 1. if self._is_area else dt.ycomp(self._ycomp)[age]
result += y * area * f
return result
def _evaluate_summary(self, period, factors):
result = 0.
for i, ocomp in enumerate(self._ocomps):
result += ocomp(period, [self._factors[i]] + factors)
return result
def _evaluate_basic_themed(self, period):
pass
def _evaluate_summed_themed(self, period):
pass
def __call__(self, period, factors=[(1., 1)]):
if self.is_basic:
return self._evaluate_basic(period, factors)
else:
return self._evaluate_summary(period, factors)
def __add__(self, other):
# assume Output + Output
if self.is_themed:
return [i + j for i, j in zip(self(), other())]
else:
return self() + other()
def __sub__(self, other):
# assume Output - Output
if self.is_themed:
return [i - j for i, j in zip(self(), other())]
else:
return self() - other()
[docs]
class ForestModel:
"""
This is the core class of the ws3 package.
Includes methods import data from various sources, simulate growth and apply actions.
The model can be used in either a (prescriptive) simulation-based approach or a (descriptive)
optimization-based approach.
This class encapsulates all the information used to simulate scenarios from a given dataset (i.e.,
stratified intial inventory, growth and yield functions, action eligibility, transition matrix, action
schedule, etc.), as well as a large collection of functions to import and export data, generate activity
schedules, and simulate application of these schedules (i.e., run scenarios).
At the heart of the ``ForestModel`` class is a list of ``DevelopentType`` instances. Each ``DevelopmentType``
instance encapsulates information about one development type (i.e., a forest stratum, which is an aggregate of
smaller *stands* that make up the raw forest inventory input data). The ``DevelopmentType`` class also
stores a list of operable *actions*, maps *state variable transitions* to these actions, stores growth and
yield functions, and knows how to *grow itself* when time is incremented during a simulation.
A typical use case starts with creating an instance of the ``ForestModel`` class. Then, we need to load data
into this instance, define one or more scenarios (using a mix of heuristic and optimization approaches),
run the scenarios, and export output data to a format suitable for analysis (or link to the next model in
a larger modelling pipeline).
"""
_ytypes = {'*Y':'a', '*YT':'t', '*YC':'c'}
tree = (lambda f: f(f))(lambda a: (lambda: dd(a(a))))
def __init__(self,
model_name,
model_path,
base_year,
horizon=common.HORIZON_DEFAULT,
period_length=common.PERIOD_LENGTH_DEFAULT,
max_age=common.MAX_AGE_DEFAULT,
area_epsilon=common.AREA_EPSILON_DEFAULT,
curve_epsilon=common.CURVE_EPSILON_DEFAULT):
"""
Initializes the ForestModel with the provided parameters.
:param str model_name: The name of model.
:param str model_path: The path to input data of model.
:param int base_year: The base year of teh model.
:param int horizon: The length (in number of periods) of the simulation horizon.
:param int max_age: The maximum age considered in the model.
:param int area_epsilon:
:param int curve_epsilon:
"""
self.model_name = model_name
self.model_path = model_path
self.base_year = base_year
self.set_horizon(horizon)
self.period_length = period_length
self.max_age = max_age
self.ages = list(range(max_age+1))
self._period_to_years_factor = None
self.yields = []
self.ynames = set()
self.actions = {}
self.transitions = {}
self.oper_expr = {}
self._themes = []
self._theme_basecodes = []
self.dtypes = {}
self.constants = {}
self.output_groups = {}
self.outputs = {}
self.applied_actions = {p:{acode:{} for acode in list(self.actions.keys())} for p in self.periods}
self.reset_actions()
self.curves = {}
self.problems = {}
c_zero = self.register_curve(core.Curve('zero',
is_special=True,
type=''))
c_unit = self.register_curve(core.Curve('unit',
points=[(0, 1)],
is_special=True,
type=''))
c_ages = self.register_curve(core.Curve('ages',
points=[(0, 0), (max_age, max_age)],
is_special=True,
type=''))
self.common_curves = {'zero':c_zero,
'unit':c_unit,
'ages':c_ages}
self.area_epsilon = area_epsilon
self.curve_epsilon = curve_epsilon
self.areaselector = GreedyAreaSelector(self)
self.inoperable_dtypes = []
[docs]
def nthemes(self):
"""
Returns number of themes
"""
return len(self._themes)
[docs]
def reset(self):
"""
Resets the forest model by clearing applied actions and reinitializing areas.
"""
self.reset_actions()
self.initialize_areas()
[docs]
def set_horizon(self, horizon):
"""
Sets the horizon of the model.
This method updates the horizon of the model to the specified value and adjusts the list of periods accordingly.
"""
self.horizon = int(horizon)
self.periods = list(range(1, horizon+1))
def _resolve_period_multiplier(self, convert_periods_to_years):
"""Resolve the Woodstock period-to-year multiplier for import helpers."""
if convert_periods_to_years is None:
return self._period_to_years_factor or 1
try:
multiplier = int(convert_periods_to_years)
except Exception as exc:
raise ValueError("convert_periods_to_years must be an integer number of years per period") from exc
if multiplier <= 0:
raise ValueError("convert_periods_to_years must be positive")
if self._period_to_years_factor is None:
self._period_to_years_factor = multiplier
elif self._period_to_years_factor != multiplier:
raise ValueError(
f"convert_periods_to_years={multiplier} conflicts with existing multiplier {self._period_to_years_factor}"
)
return multiplier
[docs]
def compile_actions(self, mask=None, verbose=False):
"""
Compile actions for the development types filtered by mask.
"""
dtype_keys = self.unmask(mask) if mask else list(self.dtypes.keys())
for dtk in dtype_keys:
dt = self.dtypes[dtk]
dt.compile_actions(verbose=verbose)
def _compile_schedule_from_problem(self, problem, formulation=1, skip_null='null'):
"""
Compiles a ``ws3``-compatible schedule data object from a solved ``ws3.opt.Problem`` instance.
This is just a dispatcher function---the actual compilation is done by a formulation-specific function
(assumes *Model I* formulation if not specified).
"""
cmp_sch_dsp = {1:self._cmp_sch_m1, 2:self._cmp_sch_m2}
return cmp_sch_dsp[formulation](problem, skip_null)
[docs]
def add_problem(self, name, coeff_funcs, cflw_e=None, cgen_data=None,
solver=opt.SOLVER_HIGHS, formulation=1, z_coeff_key='z', acodes=None,
sense=opt.SENSE_MAXIMIZE, mask=None, workers=1, verbose=False):
"""
Add an optimization problem to the model.
:param str name: Used as key to store ``ws3.opt.Problem`` instances in a dict in the ``ws3.forest.ForestModel`` instanace, so make
sure it is unique within a given model or you will overwrite dict values (assuming you want to stuff multiple
problems, and their solutions, into your model at the same time).
:param dict coeff_funcs: Dict of function references, keyed on row name strings. These are the functions that generate
the LP optimization problem matrix coefficients (for the objective function and constraint rows). This one gets
complicated, and is a likely source of bugs. Make sure the row name key strings are all unique or you will make
a mess. You can name the constraint rows anything you want, but the objective function row has to be named 'z'.
All coefficient functions must accept exactly two args, in this order: a ``ws3.forest.ForestModel`` instance and a
path (a tuple of ``ws3.core.Node`` object instances). The 'z' coefficient function is special in that it must return a single float value.
All other (i.e., constraint) coefficient functions just return a dict of floats, keyed on period ints (can be
sparse, i.e., not necessary to include key:value pairs in output dict if value is 0.0). It is useful (but not
necessary) to use ``functools.partial`` to specialize a smaller number of more general function definitions (with
more args, that get "locked down" and hidden by ``partial``) as we have done in the example in this notebook.
:param dict cflw_e: Dict of (dict, int) tuples, keyed on row name strings (must match row name key values used to
define coefficient functions for flow constraints in coeff_func dict), where the int:float dict embedded in the
tuple defines epsilon values keyed on periods (must include all periods, even if epsilon value is always the same).
See example below.
``{'foo':({1:0.01, ..., 10:0.01}, 1), 'bar':({1:0.05, ..., 10:0.05}, 1)}``
:param dict cgen_data: Dict of dict of dicts. The outer-level dict is keyed on row name strings (must match row names used
in coeff_funcs. The middle second level of dicts always has keys 'lb' and 'ub', and the inner level of dicts
specifies lower- and upper-bound general constraint RHS (float) values, keyed on period (int). See example below.
``{'foo':{'lb':{1:1., ..., 10:1.}, 'ub':{1:2., ..., 10:2.}}, 'bar':{{'lb':{1:1., ..., 10:1.}, 'ub':{1:2., ..., 10:4.}}}}``
:param int acodes: List of strings. Action codes to be included in optimization problem formulation (actions must defined
in the `ForestModel` instance, but can be only a subset).
:param int sense: Must be one of ``ws3.opt.SENSE_MAXIMIZE`` or ``ws3.opt.SENSE_MINIMIZE``, or equivalent int values (just use the
constants to keep code more legible).
:param tuple mask: Tuple of strings constituting a valid mask for your `ForestModel` instance. Can be `None` if you do not want
to filter `DevelopmentType` instances.
:param int workers: Number of worker threads to use for parallel processing.
:return: ws3.opt.Problem. Reference to a new Problem instance that was created. Also stored in the ForestModel instance (problems attribute,
keyed on problem name).
"""
# --- Prepare serialization for parallel execution ---
if workers > 1:
problems_backup = self.problems
self.problems = None
blob_bytes = dill.dumps(self) # Serialize model
self.problems = problems_backup
rebased_funcs = {k: sanitize_func(f) for k, f in coeff_funcs.items()}
serialized_funcs = {k: dill.dumps(f) for k, f in rebased_funcs.items()}
else:
blob_bytes = None
serialized_funcs = None
# --- Reset model state for problem creation ---
self.reset()
# --- Dispatch maps for formulation type ---
bld_p_dsp = {1: self._bld_p_m1, 2: self._bld_p_m2}
cmp_cflw_dsp = {1: self._cmp_cflw_m1, 2: self._cmp_cflw_m2}
cmp_cgen_dsp = {1: self._cmp_cgen_m1, 2: self._cmp_cgen_m2}
assert formulation == 1, "Only Model I supported for now"
# --- Persistent worker pool (None if workers=1) ---
with PersistentWorkerPool(workers, blob_bytes, serialized_funcs) as executor:
if verbose: print('add_problem: build problem')
p = bld_p_dsp[formulation](
name,
coeff_funcs,
solver,
z_coeff_key,
acodes,
sense,
mask,
workers,
executor, # None if serial
verbose
)
if verbose: print('add_problem: compile flow constraints')
cmp_cflw_dsp[formulation](p, cflw_e, workers=workers, executor=executor, verbose=verbose)
if verbose: print('add_problem: compile general constraints')
cmp_cgen_dsp[formulation](p, cgen_data, workers=workers, executor=executor, verbose=verbose)
# --- Save and return the problem ---
self.problems[name] = p
return p
def _bld_p_m1(
self,
name,
coeff_funcs,
solver,
z_coeff_key='z',
acodes=None,
sense=opt.SENSE_MAXIMIZE,
mask=None,
workers=1,
executor=None,
verbose=False
):
"""
Build a Model I optimization problem with batched, parallel tree processing.
"""
p = opt.Problem(name, sense=sense, solver=solver)
# Step 1: Generate trees and variables
if verbose: print('generate trees using', workers, 'workers')
p.trees, p._vars, p._leaf_ids = self._gen_vars_m1(
coeff_funcs,
acodes=acodes,
mask=mask,
workers=workers,
executor=executor,
verbose=verbose
)
# Step 2: Process trees into coverage constraints
if verbose: print('process trees')
tree_items = list(p.trees.items())
batches = list(auto_batch(tree_items, workers, max_batch_factor=4))
tasks = [(batch, z_coeff_key) for batch in batches]
results = []
if workers == 1:
for task in tasks:
results.extend(worker_summarize_tree_batch(task))
else:
exec_ctx = executor or ProcessPoolExecutor(max_workers=workers, mp_context=get_context(MP_CONTEXT))
futures = [exec_ctx.submit(worker_summarize_tree_batch, task) for task in tasks]
for f in as_completed(futures):
results.extend(f.result())
if executor is None:
exec_ctx.shutdown()
# Step 3: Apply results to the Problem object
if verbose: print('_bld_p_m1: build problem')
for cname, coeffs, z_coeffs in results:
p.add_constraint(name=cname, coeffs=coeffs, sense=opt.SENSE_EQ, rhs=1.0)
p._z.update(z_coeffs)
p.coeff_funcs = coeff_funcs
p.formulation = 1
if verbose: print('_bld_p_m1: done building problem')
return p
def _bld_p_m2(self, problem):
pass # not implemented
def _gen_vars_m1(self, coeff_funcs, acodes=None, mask=None, workers=1, executor=None, verbose=False):
"""
Generate trees, variables, and leaf IDs for Model I problems.
Parallelized with model and coeff_funcs preloaded per worker.
"""
dtype_keys = self.dtypes.keys() if not mask else self.unmask(mask)
# --- Step 1: Build (dtk, age) task list ---
self.reset()
tract_tasks = [
(dtk, age)
for dtk in dtype_keys
for age in self.dtypes[dtk]._areas[1].keys()
if self.dtypes[dtk].area(1, age)
]
# --- Step 3: Serial or Parallel execution ---
results = []
if workers == 1:
# Serial mode (do not use worker functions or it breaks _cbm_sit_yield add_problem hack)
for (dtk, age) in tract_tasks:
self.reset()
area = self.dtypes[dtk].area(1, age)
if not area: continue
tree = self._bld_tree_m1(
area, dtk, age, coeff_funcs,
tree=None, period=1,
acodes=acodes, compile_c_ycomps=True,
verbose=verbose
)
results.append((dtk, age, tree))
else:
task_batches = auto_batch(tract_tasks, workers, max_batch_factor=4)
if not executor:
# prepare serialized model and coeff_funcs
problems_backup = self.problems
self.problems = None
blob_bytes = dill.dumps(self)
self.problems = problems_backup
rebased_funcs = {k: sanitize_func(f) for k, f in coeff_funcs.items()}
serialized_funcs = {k: dill.dumps(f) for k, f in rebased_funcs.items()}
with ProcessPoolExecutor(
max_workers=workers,
mp_context=get_context(MP_CONTEXT),
initializer=init_worker_gen_vars,
initargs=(blob_bytes, serialized_funcs, workers),
) as executor:
futures = [executor.submit(worker_gen_vars, batch, acodes) for batch in task_batches]
for f in as_completed(futures):
res = f.result()
for item in res:
if isinstance(item, Exception): raise item
if item is not None: results.append(item)
else: # use executor that was passed in as arg
futures = [executor.submit(worker_gen_vars, batch, acodes) for batch in task_batches]
for f in as_completed(futures):
res = f.result()
for item in res:
if isinstance(item, Exception): raise item
if item is not None: results.append(item)
# --- Step 4: Restore problems and rebuild trees/vars ---
trees, vars, leaf_ids = {}, {}, {}
for dtk, age, tree in results:
i = (dtk, age)
trees[i] = tree
for path in tree.paths():
j = tuple(n.data('acode') for n in path)
leaf_id = path[-1].data('leaf_id')
vname = f"x_{leaf_id}"
leaf_ids[(i, j)] = leaf_id
vars[vname] = opt.Variable(vname, opt.VTYPE_CONTINUOUS, 0.0, 1.0)
return trees, vars, leaf_ids
def _gen_vars_m2(self):
pass
def _bld_tree_m1(
self,
area,
dtk,
age,
coeff_funcs,
tree=None,
period=1,
acodes=None,
compile_c_ycomps=True,
verbose=False):
"""
Build a tree of feasible action sequences (full-length paths = |periods|).
"""
# --- Step 0: Initialize tree if needed ---
if tree is None:
dt = self.dtypes[dtk]
dt.reset_areas()
self.dtypes[dtk]._areas[1][age] = area
self.reset_actions()
tree = core.Tree()
acodes = list(self.actions.keys()) if not acodes else acodes
# --- Step 1: Depth-First Search (DFS) to build the tree structure ---
for acode in acodes:
if self.dt(dtk).is_operable(acode, period, age):
# Reset actions for this period and grow stand if needed
self.reset_actions(period)
if period > 1:
self.dt(dtk).grow(period - 1, False)
# Apply action to get next state
errorcode, missingarea, tstate = self.apply_action(
dtk, acode, period, age, area,
compile_c_ycomps=compile_c_ycomps,
override_operability=False,
fuzzy_age=False,
recourse_enabled=False
)
if errorcode and verbose:
print(
'apply_action error', dtk, acode, period, age, area,
errorcode, missingarea, tstate
)
_dtk, tprop, _age = tstate[0]
assert tprop == 1. # no split handling yet
# Push node to the tree
tree.grow({
'dtk': dtk, '_dtk': _dtk, 'acode': acode, 'period': period,
'age': age, '_age': _age, 'products': None, 'area': area
})
# Recurse deeper or compute leaf coefficients
if period < self.periods[-1]: # not at last period, continue DFS
self.dt(_dtk).grow(period, False)
self._bld_tree_m1(
area, _dtk, _age + self.period_length,
coeff_funcs, tree, period + 1, acodes,
compile_c_ycomps=compile_c_ycomps)
elif period == self.periods[-1]: # reached a leaf
path = tree.path()
leaf = path[-1]
assert leaf.is_leaf()
# Serial leaf coefficient computation
leaf._data.update({k: coeff_funcs[k](self, path) for k in coeff_funcs})
i = (path[0].data('dtk'), path[0].data('age'))
j = tuple(node.data('acode') for node in path)
leaf._data['leaf_id'] = common.hex_id((i, j))
# Pop node from the tree (DFS backtrack)
tree.ungrow()
return tree
def _cmp_cflw_m1(self, problem, cflw_e, workers=1, executor=None, verbose=False):
"""
Compile flow (even-flow) constraints in parallel using batched workers.
Optimized for less overhead while respecting the original (i, j) API.
"""
if not cflw_e:
return
periods = self.periods
cflw_keys = list(cflw_e.keys())
if verbose: print("_cmp_cflw_m1: phase 1")
# Phase 1: Compute mu values in parallel
tree_items = list(problem.trees.items())
batches = auto_batch(
tree_items,
workers,
size_fn=lambda x: len(x[1].nodes()),
max_batch_factor=1
)
tasks = [(batch, cflw_keys, periods) for batch in batches]
if workers == 1:
results = []
for task in tasks:
results.extend(worker_cmp_cflw_batch(task))
else:
# Use existing executor if passed
exec_ctx = executor or ProcessPoolExecutor(max_workers=workers, mp_context=get_context(MP_CONTEXT))
futures = [exec_ctx.submit(worker_cmp_cflw_batch, task) for task in tasks]
# Collect results without repeated extend() overhead
results_nested = [f.result() for f in as_completed(futures)]
results = [item for batch in results_nested for item in batch]
if executor is None:
exec_ctx.shutdown()
# Phase 2: Merge results into mu dict
if verbose: print("_cmp_cflw_m1: phase 2")
mu = {t: {o: {} for o in cflw_keys} for t in periods}
for t, o, i, j, val in results:
mu[t][o][(i, j)] = val
# Phase 3: Build constraints (parallel with batching)
if verbose: print("_cmp_cflw_m1: phase 3")
leaf_ids = problem._leaf_ids
xnames = {k: f"x_{v}" for k, v in leaf_ids.items()}
add_constraint = problem.add_constraint
# Build Phase 3 tasks
tasks = []
for t in periods:
for o, e in cflw_e.items():
eps_dict, ref_period = e
if t not in eps_dict:
continue
mu_t_o = mu[t][o]
mu_ref_o = mu[ref_period][o]
eps = eps_dict[t]
tasks.append((t, o, mu_t_o, mu_ref_o, eps, xnames))
results = []
if workers == 1:
# Serial processing
for task in tasks:
results.extend(worker_cmp_cflw_phase3(task))
else:
# Create batches of tasks for more efficient multiprocessing
batches = auto_batch(tasks, workers, max_batch_factor=2)
exec_ctx = executor or ProcessPoolExecutor(max_workers=workers, mp_context=get_context(MP_CONTEXT))
futures = [exec_ctx.submit(worker_cmp_cflw_phase3_batch, batch) for batch in batches]
for f in as_completed(futures):
results.extend(f.result())
if executor is None:
exec_ctx.shutdown()
# Add constraints sequentially
for name, coeffs, sense, rhs in results:
add_constraint(name=name, coeffs=coeffs, sense=sense, rhs=rhs)
def _cmp_cflw_m2(self):
pass # not implemented
# def _cmp_cgen_m1(self, problem, cgen_data, workers=1, executor=None, verbose=False):
# print('foo')
# if not cgen_data: return
# mu = {t:{o:{} for o in list(cgen_data.keys())} for t in self.periods}
# for i, tree in list(problem.trees.items()):
# for path in tree.paths():
# j = tuple(n.data('acode') for n in path)
# for o in list(cgen_data.keys()):
# _mu = path[-1].data(o)
# for t in self.periods:
# mu[t][o][i, j] = _mu[t] if t in _mu else 0.
# for o, b in list(cgen_data.items()):
# for t in self.periods:
# _mu = {'x_%s' % common.hex_id((i, j)):mu[t][o][i, j] for i, j in mu[t][o]}
# if b['lb'] is not None and t in b['lb']:
# problem.add_constraint(name='gen-lb_%03d_%s' % (t, o), coeffs=_mu, sense=opt.SENSE_GEQ, rhs=b['lb'][t])
# if b['ub'] is not None and t in b['ub']:
# problem.add_constraint(name='gen-ub_%03d_%s' % (t, o), coeffs=_mu, sense=opt.SENSE_LEQ, rhs=b['ub'][t])
def _cmp_cgen_m1(self, problem, cgen_data, workers=1, executor=None, verbose=False):
"""
Compile general (CGEN) constraints.
Phase 1+2 are kept IDENTICAL to the known-good serial logic to avoid coefficient drift.
Only Phase 3 (row emission) is parallelized/batched.
"""
if not cgen_data:
return
periods = self.periods
cgen_keys = list(cgen_data.keys())
# --- Phase 1+2: build mu exactly like the reference implementation ---
if verbose: print("_cmp_cgen_m1: phase 1 and 2")
mu = {t: {o: {} for o in cgen_keys} for t in periods}
for i, tree in problem.trees.items():
for path in tree.paths():
j = tuple(n.data("acode") for n in path)
leaf = path[-1]
for o in cgen_keys:
_mu = leaf.data(o) # dict(period -> value)
for t in periods:
mu[t][o][(i, j)] = _mu[t] if t in _mu else 0.0
# --- Phase 3: build rows (can parallelize safely) ---
# Build tasks
if verbose: print("_cmp_cgen_m1: phase 1")
tasks = []
for o, bounds in cgen_data.items():
lb, ub = bounds.get('lb'), bounds.get('ub')
for t in periods:
tasks.append((t, o, mu[t][o], lb, ub))
results = []
if workers == 1:
for task in tasks:
results.extend(worker_cmp_cgen_phase3(task))
else:
batches = auto_batch(tasks, workers) # use your tuned auto_batch; no forced factor
exec_ctx = executor or ProcessPoolExecutor(max_workers=workers, mp_context=get_context(MP_CONTEXT))
futures = [exec_ctx.submit(worker_cmp_cgen_phase3_batch, batch) for batch in batches]
for f in as_completed(futures):
results.extend(f.result())
if executor is None:
exec_ctx.shutdown()
# Add rows
add_constraint = problem.add_constraint
for name, coeffs, sense, rhs in results:
add_constraint(name=name, coeffs=coeffs, sense=sense, rhs=rhs)
def _cmp_cgen_m2(self):
pass # not implemented
def _cmp_sch_m1(self, problem, skip_null):
_sch = [[] for t in self.periods]
sln = problem.solution()
if not sln: return None
for i, tree in list(problem.trees.items()):
for path in tree.paths():
x = 'x_%s' % common.hex_id((i, tuple(n.data('acode') for n in path)))
if not sln[x]: continue
for t, n in enumerate(path):
d = n.data()
if skip_null and d['acode'] == skip_null: continue
etype = '_existing' if self.dt(i[0]).area(0) else '_future'
_sch[t].append((d['dtk'], d['age'], d['area'] * sln[x], d['acode'], d['period'], etype))
return list(itertools.chain.from_iterable(_sch))
def _cmp_sch_m2(self, problem):
pass
[docs]
def add_null_action(self, acode='null', minage=None, maxage=None):
"""
Adds a null action with the specified action code, minimum age (default is None), and maximum age (default is None).
"""
mask = tuple(['?' for _ in range(self.nthemes())])
oe = '_age >= 0 and _age <= %i' % self.max_age
target = [(mask, 1.0, None, None, None, None, None)]
self.actions[acode] = Action(acode)
self.oper_expr[acode] = {mask:oe}
self.transitions[acode] = {mask:{'':target}}
for dtk in self.dtypes:
self.dtypes[dtk].oper_expr[acode] = [oe]
self.dtypes[dtk].transitions[acode, -1] = target
for p in self.applied_actions:
self.applied_actions[p][acode] = {}
[docs]
def is_harvest(self, acode):
"""
Returns True if acode corresponds to a harvesting action.
"""
return self.actions[acode].is_harvest
[docs]
def piece_size(self, dtype_key, age):
"""
Returns piece size, given development type key and age.
"""
return self.dtypes[dtype_key].ycomp(self.piece_size_yname)[age] * self.piece_size_factor
[docs]
def dt(self, dtype_key):
"""
Returns development type, given key (returns None on invalid key).
"""
try:
return self.dtypes[dtype_key]
except:
return None
[docs]
def age_class_distribution(self, period, mask=None, omit_null=False):
"""
Returns age class distribution (dict of areas, keys on age).
:param int period: The period for which to retrieve the age class distribution.
:param tuple mask: A mask to filter development types. Default is None.
:param bool omit_null: If True, omits null areas from the distribution. Default is False.
:return: A dictionary where keys are ages and values are the corresponding area distributions.
"""
result = {age:0. for age in self.ages}
dtype_keys = self.unmask(mask) if mask else list(self.dtypes.keys())
for dtk in dtype_keys:
dt = self.dtypes[dtk]
for age in dt._areas[period]:
result[age] += dt._areas[period][age]
if omit_null:
result = {k:v for k, v in result.items() if v}
return result
[docs]
def operable_dtypes(self, acode, period, mask=None):
"""
Returns dict (keyed on development type key, values are lists of operable ages).
"""
result = {}
dtype_keys = self.unmask(mask) if mask else list(self.dtypes.keys())
for dtk in dtype_keys:
dt = self.dtypes[dtk]
operable_ages = dt.operable_ages(acode, period)
if operable_ages:
result[dt.key] = operable_ages
return result
[docs]
def inventory(self, period, yname=None, age=None, mask=None, dtype_keys=None, verbose=0):
"""
Flexible method that compiles inventory at given period.
Unit of return data defaults to area if yname not given,
but takes on unit of yield component otherwise.
Can be constrained by age and development type mask.
"""
result = 0.
assert not (mask and dtype_keys) # too confusing to allow both to be specified...
if mask:
_dtype_keys = self.unmask(mask, verbose=verbose)
elif dtype_keys:
_dtype_keys = dtype_keys
else:
_dtype_keys = list(self.dtypes.keys())
shift = self.period_length
for dtk in _dtype_keys:
dt = self.dtypes[dtk]
if period == 0:
inventory_map = dt._areas[0]
else:
inventory_map = dd(float)
for src_age, src_area in dt._areas[period].items():
aged_age = src_age + shift
inventory_map[aged_age] += src_area
if yname:
ycomp = dt.ycomp(yname)
if not ycomp:
continue
ymax = getattr(ycomp, "xmax", None)
else:
ycomp = None
ymax = None
if age is not None:
value = 0.0
if age in inventory_map:
if ycomp:
lookup_age = min(age, ymax) if ymax is not None else age
factor = ycomp[lookup_age]
else:
factor = 1.0
value = inventory_map[age] * factor
result += value
else:
if ycomp:
if ymax is not None:
result += sum(area * ycomp[min(a, ymax)] for a, area in inventory_map.items())
else:
result += sum(area * ycomp[a] for a, area in inventory_map.items())
else:
result += sum(inventory_map.values())
return result
[docs]
def operable_area(self, acode, period, age=None, mask=None):
"""
Returns total operable area, given action code and period (and optionally age).
"""
dtype_keys = list(self.dtypes.keys()) if not mask else self.unmask(mask)
return sum(self.dtypes[dtk].operable_area(acode, period, age) for dtk in dtype_keys)
[docs]
def overwrite_initial_areas(self, period):
"""
Overwrites the initial areas for all development types for the specified period.
"""
for dt in list(self.dtypes.values()): dt.overwrite_initial_areas(period)
[docs]
def initialize_areas(self, reset_areas=True):
"""
Copies areas from period 0 to period 1.
"""
if reset_areas: self.reset_areas()
for dtk in self.dtypes: self.dtypes[dtk].initialize_areas()
[docs]
def reset_areas(self, period=None):
"""
Reset areas for all development types.
"""
for dtk in self.dtypes: self.dtypes[dtk].reset_areas(period)
[docs]
def register_curve(self, curve):
"""
Add curve to global curve hash map (uses result of Curve.points() to construct hash key).
"""
key = tuple(curve.points()) # TO DO: use builtin common.hex_id() function to convert curves to hashed valued?
if key not in self.curves:
# new curve (lock and register)
curve.is_locked = True # points list must not change, else not valid key
self.curves[key] = curve
return self.curves[key]
[docs]
def reset_actions(self, period=None, acode=None, override_sticky=False):
"""
Resets actions.
By default resets, all actions in all periods (except for sticky actions, unless overridden),
unless period or acode specified.
"""
periods = [period] if period else self.periods
acodes = [acode] if acode else list(self.actions.keys())
for p in periods:
if p not in self.applied_actions: self.applied_actions[p] = {}
for a in acodes:
if a in self.actions and self.actions[a].is_sticky and not override_sticky: continue
self.applied_actions[p][a] = {}
[docs]
def compile_product(self,
period,
expr,
acode=None,
dtype_keys=None,
age=None,
coeff=False,
verbose=False):
"""
Compiles products from applied actions in given period. Parses string expression, which resolves to a single coefficient.
Operated area can be filtered on action code, development type key list, and age. Result is product of sum of filtered
area and coefficient.
"""
aa = self.applied_actions
if acode is None:
acodes = list(self.actions.keys())
else:# elif type(acode) == str:
acodes = [acode] if not self.actions[acode].components else self.actions[acode].components
tokens = expr.split(' ')
result = 0.
for _acode in acodes:
if _acode not in list(aa[period].keys()): continue # acode not in solution
_dtype_keys = list(aa[period][_acode].keys()) if dtype_keys is None else dtype_keys
for dtk in _dtype_keys:
if dtk not in list(aa[period][_acode].keys()):
continue
ages = list(aa[period][_acode][dtk].keys()) if age is None else [age]
for _age in ages:
aaa = aa[period][_acode][dtk][_age]
_tokens = []
for token in tokens:
if token in self.ynames: # found reference to ycomp
if token in aaa[1]: # token is yname in products (replace with value)
_tokens.append(str(aaa[1][token]))
else: # assume null value if ycomp exists but not stored in solution
_tokens.append('0.')
else:
_tokens.append(token)
_expr = ' '.join(_tokens)
area = aaa[0] if not coeff else 1.
try:
result += eval(_expr) * area
except ZeroDivisionError:
pass # let this one go...
except:
print(("Unexpected error:", sys.exc_info()[0]))
print("evaluating expression '%s' for case:" % ' '.join(_tokens), period, [' '.join(dtk)], _acode, _age)
raise
return result
[docs]
def operated_area(self, acode, period, dtype_key=None, age=None):
"""
Compiles operated area, given action code and period (and optionally list of development type keys or age).
"""
aa = self.applied_actions
acodes = [acode] if not self.actions[acode].components else self.actions[acode].components
result = 0.
for _acode in acodes:
if not aa[period][_acode]: continue # acode not in solution
dtype_keys = list(aa[period][_acode].keys()) if dtype_key is None else [dtype_key]
for _dtype_key in dtype_keys:
ages = list(aa[period][_acode][_dtype_key].keys()) if age is None else [age]
for _age in ages:
result += aa[period][_acode][_dtype_key][_age][0]
return result
[docs]
def repair_actions(self, period, areaselector=None, verbose=False):
"""
Attempts to repair the action schedule for given period, using an AreaSelector object
(defaults to class-default areaselector, which is a simple greedy oldest-first selector).
"""
if areaselector is None: # use default (greedy) selector
areaselector = self.areaselector
aa = copy.copy(self.applied_actions[period])
self.reset_actions(period)
for acode in aa:
if not aa[acode]: continue # null solution, move along...
old_area = 0.
new_area = 0.
# start by re-applying as much of the old solution as possible
for dtype_key in aa[acode]:
for age in aa[acode][dtype_key]:
aaa = aa[acode][dtype_key][age][0]
old_area += aaa
oa = self.dtypes[dtype_key].operable_area(acode, period, age)
if not oa: continue
applied_area = min(aaa, oa)
#print ' applying old area', applied_area
new_area += applied_area
self.apply_action(dtype_key, acode, period, age, applied_area)
# try to make up for missing area...
target_area = old_area - new_area
if verbose:
print(' patched %i of %i solution hectares, missing' % (int(new_area), int(old_area)), target_area)
if areaselector is None: # use default area selector
areaselector = self.areaselector
areaselector.operate(period, acode, target_area)
[docs]
def commit_actions(self, period=1, repair_future_actions=False, verbose=False):
"""
Commits applied actions (i.e., apply transitions and grow, default starting at period 1).
By default, will attempt to repair broken (infeasible) future actions, attempting to replace infeasiblea operated area using default AreaSelector.
"""
while period < self.horizon:
if verbose: print('growing period', period)
self.grow(period, cascade=False)
period += 1
if repair_future_actions:
if verbose: print('repairing actions in period', period)
self.repair_actions(period)
else:
self.reset_actions(period)
[docs]
def resolve_replace(self, dtk, expr):
"""
Enables the creation of new development types by replacing an existing attribute code with a new value for a specific theme,
instead of directly coding the attribute change in transition.
"""
# HACK ####################################################################
# Too lazy to implement all the use cases.
# This should work OK for BFEC models (TO DO: confirm).
tokens = re.split(r'\s+', expr)
i = int(tokens[0][3]) - 1
try:
return str(eval(expr.replace(tokens[0], dtk[i])))
except:
print('source', ' '.join(dtype_key))
print('target', ' '.join(tmask), tprop, tage, tlock, treplace, tappend)
print('dtk', ' '.join(dtk))
raise
###########################################################################
# HACK ####################################################################
# Too lazy to implement.
# Not used in BFEC models (TO DO: confirm).
[docs]
def resolve_append(self, dtk, expr):
"""
This method has not been implemented yet.
"""
assert False # brick wall (deal with this case later, as needed)
[docs]
def resolve_targetage(self, dtk, tyield, sage, tage, acode, verbose=False):
"""
This method determines the target age for a transition based on different
parameters such as the development type key (dtk), yield information (tyield),
stand age (sage), target age override (tage), and action code (acode).
"""
action = self.actions[acode]
if tyield is not None: # yield-based age definition
if verbose:
print('yield-based age definition', tyield, self.dt(dtk).ycomp(tyield[0]).lookup(tyield[1], roundx=True))
try:
targetage = self.dt(dtk).ycomp(tyield[0]).lookup(tyield[1], roundx=True)
except:
print(' '.join(dtk), tyield[0], self.dt(dtk).ycomps())
assert False
elif tage is not None: # target age override specifed in transition
if verbose: print('_AGE override', tage)
targetage = tage
elif action.targetage is None: # use source age
if verbose: print('source age', age)
targetage = sage
else: # default: age reset to 0
if verbose: print('default age reset to 0')
targetage = 0
return targetage
[docs]
def apply_action(self,
dtype_key,
acode,
period,
age,
area,
override_operability=False,
fuzzy_age=True,
recourse_enabled=True,
areaselector=None,
compile_t_ycomps=False,
compile_c_ycomps=False,
verbose=False):
"""
Applies action, given action code, development type, period, age, area.
Can optionally override operability limits, optionally use fuzzy age (i.e., attempt
to apply action to proximal age class if specified age is not operable), optionally use
default AreaSelector to patch missing area (if recourse enabled). Applying an action is
a rather complex process, involving testing for operability (JIT-compiling operability
expression as required), checking that valid transitions are defined, checking that area
is available (possibly using fuzzy age and area selector functions to find missing area),
generate list of target development types (from source development type and transition
expressions [which may need to be JIT-compiled]), creating new development types
(as needed), doing the area accounting correctly (without creating or destroying any area)
and compiling the products from the action (which gets a bit complicated in the case of
partial cuts...).
:param tuple dtype_key: The key identifying the development type.
:param str acode: The action code to apply.
:param int period: The period in which to apply the action.
:param int age: The age at which to apply the action.
:param float area: The area to apply the action on.
:param bool override_operability: If True, overrides operability limits. Default is False.
:param bool fuzzy_age: If True, attempts to apply action to proximal age class if specified age is not operable.
:param bool recourse_enabled: If True, uses default AreaSelector to patch missing area. Default is True.
:param bool areaselector: The AreaSelector object to use for patching missing area. Default is None.
:param bool compile_t_ycomps: If True, compiles time-indexed yield components. Default is False.
:param bool compile_c_ycomps: If True, compiles complex yield components. Default is False.
:param bool verbose: If True, prints additional information for debugging purposes. Default is False.
Returns (errorcode, missing_area, target_dt) triplet, where errorcode is an error code,
missing_area is the missing area, and target_dt is a list of (dtk, tprop, targetage)
triplets (one triplet per target development type).
:return: A tuple containing errorcode, missing_area, and target_dt.
**Error codes:**
1. invalid area argument
2. requested action not defined for development type
3. requested action defined, but never operable
4. action not operable
5. transitions not defined for action
"""
if area <= 0. and not override_operability: return 1, None, None
if verbose > 1:
print('applying action', [' '.join(dtype_key)], acode, period, age, area)
dt = self.dtypes[dtype_key]
if acode not in dt.oper_expr:
print('requested action not defined for development type...')
print(' ', [' '.join(dtype_key)], acode, period, age, area)
return 2, None, None
if acode not in dt.operability: # action not compiled yet...
if dt.compile_action(acode) == -1:
print('requested action is defined, but never not operable...')
print(' ', [' '.join(dtype_key)], acode, period, age, area)
return 3, None, None
if not dt.is_operable(acode, period, age) and not override_operability:
print('not operable')
print(' '.join(dt.key), acode, period, age)
print(dt.operability[acode][period])
return 4, None, None
if not any((acode, __age) in dt.transitions for __age in (age, -1)): # sanity check...
print('transitions not defined...')
print(' ', [' '.join(dtype_key)], acode, period, age, area)
print(dt.oper_expr)
print(dt.operability)
return 5, None, None
if dt.area(period, age) - area < self.area_epsilon:
# tweak area if slightly over or under, so we don't get any accounting drift...
area = dt.area(period, age)
missing_area = 0.
if dt.area(period, age) < area:
# insufficient area in dt to operate (infeasible)
# apply action to operable area, then look for missing area in adjacent ageclasses
if dt.area(period, age) > 0: # operate available area before applying recourse
print('insufficient area in dt to operate (infeasible)', dtype_key, period, age)
self.apply_action(dtype_key, acode, period, age, dt.area(period, age),
False, False, False, None, True)
missing_area = area - dt.area(period, age)
if fuzzy_age and missing_area:
for age_delta in [+1, -1, +2, -2]:
_age = age + age_delta
if dt.area(period, _age) > 0 and any((acode, __age) in dt.transitions for __age in (_age, -1)):
_area = min(missing_area, dt.area(period, _age))
self.apply_action(dtype_key, acode, period, _age, _area,
False, False, False, None, True)
missing_area -= _area
if missing_area < self.area_epsilon:
missing_area = 0.
break
if recourse_enabled and missing_area:
areaselector = self.areaselector if areaselector is None else areaselector
missing_area = areaselector.operate(period, acode, missing_area)
if missing_area < self.area_epsilon:
missing_area = 0.
action = self.actions[acode]
###########################################################################
dt.area(period, age, -area)
target_dt = []
__age = age if (acode, age) in dt.transitions else -1
for target in dt.transitions[acode, __age]:
tmask, tprop, tyield, tage, tlock, treplace, tappend = target # unpack tuple
dtk = list(dtype_key) # start with source key
###########################################################################
# DO TO: Confirm correct order for evaluating mask, _APPEND and _REPLACE...
dtk = [t if tmask[i] == '?' else tmask[i] for i, t in enumerate(dtk)]
if treplace: dtk[treplace[0]] = self.resolve_replace(dtk, treplace[1])
if tappend: dtk[tappend[0]] = self.resolve_append(dtk, tappend[1])
dtk = tuple(dtk)
###########################################################################
# import pdb; pdb.set_trace()
_dt = self.create_dtype_fromkey(dtk) if dtk not in self.dtypes else self.dtypes[dtk]
targetage = self.resolve_targetage(dtk, tyield, age, tage, acode)
_dt.area(period, targetage, area*tprop)
target_dt.append([dtk, tprop, targetage])
aa = self.applied_actions[period][acode]
if dtype_key not in aa: aa[dtype_key] = {}
if age not in aa[dtype_key]: aa[dtype_key][age] = [0., {}]
aa[dtype_key][age][0] += area
for yname in dt.ycomps():
ycomp = dt.ycomp(yname)
if ycomp.type == 't' and not compile_t_ycomps: continue # skip time-indexed ycomps
if ycomp.type == 'c' and not compile_c_ycomps: continue # skip complex ycomps
if yname in action.partial:
value = 0.
for dtk, tprop, targetage in target_dt:
_dt = self.dtypes[dtk]
_value = 0.
if yname in dt.ycomps():
if yname in _dt.ycomps():
_value = (dt.ycomp(yname)[age] - _dt.ycomp(yname)[targetage])
else:
_value = dt.ycomp(yname)[age]
if _value > 0.:
value += _value * tprop
else:
if verbose:
if _value < 0:
print('negative partial value', acode, yname, tprop, _value)
print(' ', ''.join(dtype_key), age)
print(' ', ''.join(dtk), targetage)
print()
else: # not partial
value = dt.ycomp(yname)[age]
if value != 0.:
aa[dtype_key][age][1][yname] = value
return 0, missing_area, target_dt
[docs]
def create_dtype_fromkey(self, key):
"""
Creates a new development type, given a key (checks for existing, auto-assigns yield compompontents,
auto-assign actions and transitions, checks for operability (filed under inoperable if applicable).
"""
assert key not in self.dtypes # should not be creating new dtypes from existing key
dt = DevelopmentType(key, self)
self.dtypes[key] = dt
for mask, t, ycomps in self.yields:
if self.match_mask(mask, key):
for yname, ycomp in ycomps:
dt.add_ycomp(t, yname, ycomp)
# assign actions and transitions
for acode in self.oper_expr:
for mask in self.oper_expr[acode]:
if self.match_mask(mask, key):
dt.oper_expr[acode].append(self.oper_expr[acode][mask])
for mask in self.transitions[acode]:
if self.match_mask(mask, key):
for scond in self.transitions[acode][mask]:
for x in self.resolve_condition(scond, key):
dt.transitions[acode, x] = self.transitions[acode][mask][scond]
if not dt.transitions:
self.inoperable_dtypes.append(key)
return dt
def _resolve_outputs_buffer(self, s, for_flag=None):
n = self.nthemes()
group = 'no_group' # outputs declared at top of file assigned to 'no_group'
self.output_groups[group] = set()
ocode = ''
buffering_for = False
s = re.sub(r'\{.*?\}', '', s, flags=re.M|re.S) # remove curly-bracket comments
for l in re.split(r'[\r\n]+', s, flags=re.M|re.S):
if re.match(r'^\s*(;|$)', l): continue # skip comments and blank lines
matches = re.findall(r'#[A-Za-z0-9_]*', l)
for m in matches: # replace CONSTANTS variables with value
try:
l = l.replace(m, str(self.constants[m[1:].lower()]))
except:
import sys
print(sys.exc_info()[0])
print(l)
print(matches, m)
assert False
if buffering_for:
if l.strip().startswith('ENDFOR'):
for i in range(for_lo, for_hi+1):
ss = '\n'.join(for_buffer).replace(for_var, str(i))
self._resolve_outputs_buffer(ss, for_flag=i)
buffering_for = False
continue
else:
for_buffer.append(l)
continue
l = re.sub(r'\s+', ' ', l) # separate tokens by single space
l = l.strip().partition(';')[0].strip()
l = l.replace(' (', '(') # remove space to left of left parentheses
t = l.lower().split(' ')
##################################################
# HACK ###########################################
# substitute ugly symbols have in ocodes...
l = l.replace(r'%', 'p')
l = l.replace(r'$', 's')
##################################################
tokens = l.lower().split(' ')
if l.startswith('*GROUP'):
keyword = 'group'
group = tokens[1].lower()
self.output_groups[group] = set()
elif l.startswith('FOR'):
# pattern matching may not be very robust, but works for now with:
# 'FOR XX := 1 to 99'
# TO DO: implement DOWNTO, etc.
for_var = re.search(r'(?<=FOR\s).+(?=:=)', l).group(0).strip()
for_lo = int(re.search(r'(?<=:=).+(?=to)', l).group(0))
for_hi = int(re.search(r'(?<=to).+', l).group(0))
for_buffer = []
buffering_for = True
continue
if l.startswith('*OUTPUT') or l.startswith('*LEVEL'):
keyword = 'output' if l.startswith('*OUTPUT') else 'level'
if ocode: # flush data collected from previous lines
self.outputs[ocode] = Output(parent=self,
code=ocode,
expression=expression,
description=description,
theme_index=theme_index)
tt = tokens[1].split('(')
ocode = tt[0]
theme_index = tt[1][3:-1] if len(tt) > 1 else None
description = ' '.join(tokens[2:])
expression = ''
self.output_groups[group].add(ocode)
if keyword == 'level':
self.outputs[ocode] = Output(parent=self,
code=ocode,
expression=expression,
description=description,
theme_index=theme_index,
is_level=True)
ocode = ''
elif l.startswith('*SOURCE'):
keyword = 'source'
expression += l[8:]
elif keyword == 'source': # continuation line of SOURCE expression
expression += ' '
expression += l
[docs]
def import_outputs_section(self, filename_suffix='out'):
"""
Imports OUTPUTS section from a Forest model.
"""
with open('%s/%s.%s' % (self.model_path, self.model_name, filename_suffix)) as f:
s = f.read()
self._resolve_outputs_buffer(s)
[docs]
def add_theme(self, name, basecodes=[], aggs={}, description=''):
"""
Adds a theme to the model.
:param str name: The name of theme.
:param list basecodes: List of base codes for the theme.
:param dict aggs: Dictionary containing aggregated values for the theme.
:param str description: Description of the theme.
"""
self._themes.append({})
#self.nthemes +- 1
self._themes[-1]['__name__'] = name
self._themes[-1]['__description__'] = description
if basecodes: self._theme_basecodes.append([])
for c in basecodes:
self._themes[-1][c] = c
self._theme_basecodes[-1].append(c)
for c in aggs:
self._themes[-1][c] = aggs[c]
[docs]
def import_landscape_section(self, filename_suffix='lan', ti_offset=0):
"""
Imports LANDSCAPE section from a Forest model.
"""
with open('%s/%s.%s' % (self.model_path, self.model_name, filename_suffix)) as f:
data = f.read()
_data = re.search(r'\*THEME.*', data, re.M|re.S).group(0) # strip leading junk
t_data = re.split(r'\*THEME.*\n', _data)[1:] # split into theme-wise chunks
for ti, t in enumerate(t_data, start=ti_offset):
self._themes.append({})
self._themes[-1]['__name__'] = 'theme%i' % ti
self._themes[-1]['__description__'] = '' # TO DO: extract comments in theme declaration
self._theme_basecodes.append([])
defining_aggregates = False
for l in [l for l in t.split('\n') if not re.match(r'^\s*(;|{|$)', l)]:
if re.match(r'^\s*\*AGGREGATE', l): # aggregate theme attribute code
tac = re.split(r'\s+', l.strip())[1].lower()
self._themes[ti][tac] = []
defining_aggregates = True
continue
if not defining_aggregates: # line defines basic theme attribute code
tac = re.search(r'\S+', l.strip()).group(0).lower()
self._themes[ti][tac] = tac
self._theme_basecodes[ti].append(tac)
else: # line defines aggregate values (parse out multiple values before comment)
_tacs = [_tac.lower() for _tac in re.split(r'\s+', l.strip().partition(';')[0].strip())]
self._themes[ti][tac].extend(_tacs)
[docs]
def theme_basecodes(self, theme_index):
"""
Return list of base codes, given theme index.
"""
return self._theme_basecodes[theme_index]
[docs]
def import_areas_section(self,
model_path=None,
model_name=None,
filename_suffix='are',
import_empty=False,
convert_periods_to_years=None):
"""
Imports AREAS section from a Forest model.
.. note::
- This method parses the AREAS section from a Forest model file.
- Each line in the section represents an area for a specific development type and age class.
- The section should contain information about development type keys, ages, and corresponding areas.
- Empty areas (with values less than the area_epsilon) can be skipped if import_empty is False.
"""
n = self.nthemes()
model_path = self.model_path if not model_path else model_path
model_name = self.model_name if not model_name else model_name
multiplier = self._resolve_period_multiplier(convert_periods_to_years)
with open('%s/%s.%s' % (model_path, model_name, filename_suffix)) as f:
for l in f:
try:
if re.match(r'^\s*(;|$)', l): continue # skip comments and blank lines
l = l.lower().strip().partition(';')[0] # strip leading whitespace and trailing comments
t = re.split(r'\s+', l)
key = tuple(_t for _t in t[1:n+1])
age = int(t[n+1]) * multiplier
area = float(t[n+2].replace(',', ''))
if area < self.area_epsilon and not import_empty: continue
if key not in self.dtypes: self.dtypes[key] = DevelopmentType(key, self)
self.dtypes[key].area(0, age, area)
except Exception as e:
print('Failed AREAS import on line: \n%s' % l)
return 1
return 0
def _expand_action(self, c):
self._actions = t
return [c] if t[c] == c else list(_cfi(self._expand_action(t, c) for c in t[c]))
def _expand_theme(self, t, c, verbose=False): # depth-first search recursive aggregate theme code expansion
if verbose > 1:
print('ws3.forest.ForestModel._expand_theme', t, c)
print(c)
return [c] if t[c] == c else list(_cfi(self._expand_theme(t, c) for c in t[c]))
[docs]
def match_mask(self, mask, key):
"""
Returns True if key matches mask.
"""
for ti, tac in enumerate(mask):
if tac == '?': continue # wildcard matches all keys
tacs = self._expand_theme(self._themes[ti], tac)
if key[ti] not in tacs: return False # reject key
return True # key matches
[docs]
def unmask(self, mask, verbose=0):
"""
Iteratively filter list of development type keys using mask values.
Accepts Woodstock-style string masks to facilitate cut-and-paste testing.
"""
if isinstance(mask, str): # Woodstock-style string mask format
mask = tuple(re.sub(r'\s+', ' ', mask).lower().split(' '))
assert len(mask) == self.nthemes() # must be bad mask if wrong theme count
else:
try:
assert isinstance(mask, tuple) and len(mask) == self.nthemes()
except:
print(len(mask), type(mask), mask)
assert False
dtype_keys = copy.copy(list(self.dtypes.keys())) # filter this
for ti, tac in enumerate(mask):
if tac == '?': continue # wildcard matches all
tacs = self._expand_theme(self._themes[ti], tac, verbose=verbose) if tac in self._themes[ti] else []
dtype_keys = [dtk for dtk in dtype_keys if dtk[ti] in tacs] # exclude bad matches
return dtype_keys
[docs]
def import_constants_section(self, filename_suffix='con'):
"""
Imports CONSTANTS section from a Forest model.
.. note::
- This method parses the CONSTANTS section from a Forest model file.
- Each line in the section represents a constant and its value.
- Constants are stored in a dictionary where the keys are the constant names and the values are their respective values.
- The section should contain information about various constants used in the model.
"""
with open('%s/%s.%s' % (self.model_path, self.model_name, filename_suffix)) as f:
for lnum, l in enumerate(f):
if re.match(r'^\s*(;|$)', l): continue # skip comments and blank lines
l = l.strip().partition(';')[0].strip() # strip leading whitespace, trailing comments
t = re.split(r'\s+', l)
self.constants[t[0].lower()] = float(t[1])
[docs]
def import_yields_section(self,
filename_suffix='yld',
mask_func=None,
verbose=False,
convert_periods_to_years=None):
"""
Imports YIELDS section from a Forest model.
"""
def flush_ycomps(t, m, n, c):
if t == 'a': # age-based ycomps
_c = lambda y: self.register_curve(core.Curve(y,
points=c[y],
type='a',
period_length=self.period_length))
ycomps = [(y, _c(y)) for y in n]
elif t == 't': # time-based ycomps (skimp on x range)
_c = lambda y: self.register_curve(core.Curve(y,
points=c[y],
type='t',
xmax=self.horizon,
period_length=self.period_length))
ycomps = [(y, _c(y)) for y in n]
else: # complex ycomps
ycomps = [(y, c[y]) for y in n]
self.yields.append((m, t, ycomps)) # stash for creating new dtypes at runtime...
self.ynames.update(n)
for k in self.unmask(m):
for yname, ycomp in ycomps:
self.dtypes[k].add_ycomp(t, yname, ycomp)
self._resolve_period_multiplier(convert_periods_to_years)
age_multiplier = self._period_to_years_factor or 1
period_step = self._period_to_years_factor or self.period_length
n = self.nthemes()
ytype = ''
mask = ('?',) * self.nthemes()
ynames = []
data = None
with open('%s/%s.%s' % (self.model_path, self.model_name, filename_suffix)) as f:
for lnum, l in enumerate(f):
if re.match(r'^\s*(;|$)', l): continue # skip comments and blank lines
l = l.strip().partition(';')[0].strip() # strip leading whitespace and trailing comments
t = re.split(r'\s+', l)
if t[0].startswith('*Y'): # new yield definition
newyield = True
flush_ycomps(ytype, mask, ynames, data) # apply yield from previous block
ytype = self._ytypes[t[0]]
mask = tuple(_t.lower() for _t in t[1:])
mask = mask_func(mask) if mask_func else mask
if verbose: print(lnum, ' '.join(mask))
continue
if newyield:
if t[0] == '_AGE':
is_tabular = True
ynames = [_t.lower() for _t in t[1:]]
data = {yname:[] for yname in ynames}
newyield = False
continue
else:
is_tabular = False
ynames = []
data = {}
newyield = False
else:
if t[0] == '_AGE': # same yield block, new table
flush_ycomps(ytype, mask, ynames, data) # apply yield from previous block
is_tabular = True
ynames = [_t.lower() for _t in t[1:]]
data = {yname:[] for yname in ynames}
newyield = False
continue
if is_tabular:
try:
x = int(t[0]) * age_multiplier
except:
print(lnum, l)
for i, yname in enumerate(ynames):
data[yname].append((x, float(t[i+1])))
else:
if ytype in 'at': # standard or time-based yield (extract xy values)
if not common.is_num(t[0]): # first line of row-based yield component
yname = t[0].lower()
ynames.append(yname)
start = int(t[1]) * period_step
data[yname] = [(start + (i * period_step), float(t[i+2]))
for i in range(len(t) - 2)]
else: # continuation of row-based yield compontent
x_last = data[yname][-1][0]
if len(data[yname]) >= 2:
step = data[yname][1][0] - data[yname][0][0]
else:
step = period_step
if not step:
step = period_step
data[yname].extend([(x_last + step * (i + 1), float(t[i]))
for i in range(len(t))])
else:
yname = t[0].lower()
ynames.append(yname)
data[yname] = ' '.join(t[1:]) # complex yield (defer interpretation)
flush_ycomps(ytype, mask, ynames, data)
[docs]
def import_actions_section(self,
filename_suffix='act',
mask_func=None,
nthemes=None,
convert_periods_to_years=None):
"""
Imports ACTIONS section from a Forest model.
.. note::
- This method parses the ACTIONS section from a Forest model file.
- The section should contain information about actions, operability, aggregates, and partials.
- Each action is represented by a code, description, and operability condition.
- Operability conditions are defined for different masks.
- Aggregates combine multiple actions.
- Partials represent components of an action.
"""
nthemes = nthemes if nthemes else self.nthemes()
multiplier = self._resolve_period_multiplier(convert_periods_to_years)
def scale_expr(expr):
if multiplier == 1:
return expr
pattern = re.compile(r'(_age\s*(?:>=|<=|=|<|>)\s*)(\d+)')
def repl(match):
return f"{match.group(1)}{int(int(match.group(2)) * multiplier)}"
return pattern.sub(repl, expr)
actions = {}
aggregates = {}
partials = {}
keyword = ''
with open('%s/%s.%s' % (self.model_path, self.model_name, filename_suffix)) as f: s = f.read().lower()
s = re.sub(r'\{.*?\}', '', s, flags=re.M|re.S) # remove curly-bracket comments
for l in re.split(r'[\r\n]+', s, flags=re.M|re.S):
if re.match(r'^\s*(;|$)', l): continue # skip comments and blank lines
l = l.strip().partition(';')[0].strip() # strip leading whitespace and trailing comments
l = re.sub(r'r\s+', ' ', l) # separate tokens by single space
tokens = l.split(' ')
if l.startswith('*action'):
keyword = 'action'
acode = tokens[1]
targetage = 0 if tokens[2] == 'y' else None
descr = ' '.join(tokens[3:])
lockexempt = '_lockexempt' in tokens
self.actions[acode] = Action(acode, targetage, descr, lockexempt)
self.oper_expr[acode] = {}
elif l.startswith('*operable'):
keyword = 'operable'
acode = tokens[1]
elif l.startswith('*aggregate'):
keyword = 'aggregate'
acode = tokens[1]
self.actions[acode] = Action(acode)
elif l.startswith('*partial'):
keyword = 'partial'
acode = tokens[1]
partials[acode] = []
else: # continuation of OPERABLE, AGGREGATE, or PARTIAL block
if keyword == 'operable':
mask = tuple(tokens[:nthemes])
mask = mask_func(mask) if mask_func else mask
expression = ' '.join(tokens[nthemes:])
self.oper_expr[acode][mask] = scale_expr(expression)
elif keyword == 'aggregate':
self.actions[acode].components.extend(tokens)
elif keyword == 'partial':
self.actions[acode].partial.extend(tokens)
for acode, a in list(self.actions.items()):
if a.components: continue # aggregate action, skip
for mask, expression in list(self.oper_expr[acode].items()):
for k in self.unmask(mask):
#if acode == 'act1': print ' '.join(k), acode, expression
self.dtypes[k].oper_expr[acode].append(expression)
[docs]
def resolve_treplace(self, dt, treplace):
if '_TH' in treplace: # assume incrementing integer theme value
i = int(re.search(r'(?<=_TH)\w+', treplace).group(0))
return eval(re.sub('_TH%i'%i, str(dt.key[i-1]), treplace))
else:
assert False # many other possible arguments (see Forest documentation)
[docs]
def resolve_tappend(self, dt, tappend):
"""
This feature has not been implemented yet.
"""
assert False # brick wall (not implemented yet)
[docs]
def resolve_tmask(self, dt, tmask, treplace, tappend):
"""
Returns new developement type key (tuple of values, one per theme), given developement type and (treplace, tappend) expressions.
"""
key = list(dt.key)
if treplace:
key[treplace[0]] = resolve_treplace(dt, treplace[1])
if tappend:
key[tappend[0]] = resolve_tappend(dt, tappend[1])
for i, val in enumerate(tmask):
if theme == '?': continue # wildcard (skip it)
key[i] = val
return tuple(key)
[docs]
def resolve_condition(self, condition, dtype_key=None):
"""
Evaluate @AGE or @YLD condition.
Returns list of ages.
"""
if not condition:
#return self.ages
return [-1]
elif condition.startswith('@AGE'):
lo, hi = [int(a) for a in condition[5:-1].split('..')]
multiplier = self._period_to_years_factor or 1
if multiplier != 1:
lo *= multiplier
hi *= multiplier
step = multiplier
else:
step = 1
return list(range(lo, hi + step, step))
elif condition.startswith('@YLD'):
args = re.split(r'\s?,\s?', condition[5:-1])
yname = args[0].lower()
lo, hi = [float(y) for y in args[1].split('..')]
dt = self.dtypes[dtype_key]
lo_age, hi_age = dt.ycomp(yname).range(lo, hi, as_bounds=True)
return list(range(lo_age, hi_age+1))
#return self.dtypes[dtype_key].resolve_condition(yname, hi, lo)
[docs]
def import_transitions_section(self,
filename_suffix='trn',
mask_func=None,
nthemes=None,
convert_periods_to_years=None):
"""
Imports TRANSITIONS section from a Forest model.
"""
nthemes = nthemes if nthemes else self.nthemes()
self._resolve_period_multiplier(convert_periods_to_years)
age_multiplier = self._period_to_years_factor or 1
def flush_transitions(acode, sources):
if not acode: return # nothing to flush on first loop
self.transitions[acode] = {}
for smask, scond in sources:
# store transition data for future dtypes creation
if smask not in self.transitions[acode]:
self.transitions[acode][smask] = {}
self.transitions[acode][smask][scond] = sources[smask, scond]
# assign transitions to existing dtypes
for k in self.unmask(smask):
dt = self.dtypes[k]
for x in self.resolve_condition(scond, k): # store targets
dt.transitions[acode, x] = sources[smask, scond]
acode = None
with open('%s/%s.%s' % (self.model_path, self.model_name, filename_suffix)) as f:
s = f.read()
s = re.sub(r'\{.*?\}', '', s, flags=re.M|re.S) # remove curly-bracket comments
for l in re.split(r'[\r\n]+', s, flags=re.M|re.S):
if re.match(r'^\s*(;|$)', l): continue # skip comments and blank lines
l = l.strip().partition(';')[0].strip() # strip leading whitespace, trailing comments
tokens = re.split(r'\s+', l)
if l.startswith('*CASE'):
if acode: flush_transitions(acode, sources)
acode = tokens[1].lower()
sources = {}
elif l.startswith('*SOURCE'):
smask = tuple(t.lower() for t in tokens[1:nthemes+1])
smask = mask_func(smask) if mask_func else smask
match = re.search(r'@.+\)', l)
scond = match.group(0) if match else ''
sources[(smask, scond)] = []
elif l.startswith('*TARGET'):
tmask = tuple(t.lower() for t in tokens[1:nthemes+1])
tmask = mask_func(tmask) if mask_func else tmask
tprop = float(tokens[nthemes+1]) * 0.01
tyield = None
if len(tokens) > nthemes+2 and tokens[nthemes+2].lower() in self.ynames:
tyield = (tokens[nthemes+2].lower(), float(tokens[nthemes+3]))
try: # _AGE keyword
tage = int(tokens[tokens.index('_AGE')+1]) * age_multiplier
except:
tage = None
try: # _LOCK keyword
tlock = int(tokens[tokens.index('_LOCK')+1]) * age_multiplier
except:
tlock = None
try: # _REPLACE keyword (TO DO: implement other cases)
args = re.split(r'\s?,\s?', re.search(r'(?<=_REPLACE\().*(?=\))', l).group(0))
theme_index = int(args[0][3]) - 1
treplace = theme_index, args[1]
except:
treplace = None
try: # _APPEND keyword (TO DO: implement other cases)
args = re.split(r'\s?,\s?', re.search(r'(?<=_APPEND\().*(?=\))', l).group(0))
theme_index = int(args[0][3]) - 1
tappend = theme_index, args[1]
except:
tappend = None
sources[(smask, scond)].append((tmask, tprop, tyield, tage, tlock, treplace, tappend))
flush_transitions(acode, sources)
[docs]
def import_optimize_section(self, filename_suffix='opt'):
"""
Imports OPTIMIZE section from a Forest model.
.. warning::
Not implemented yet.
"""
pass
[docs]
def import_graphics_section(self, filename_suffix='gra'):
"""
Imports GRAPHICS section from a Forest model.
.. warning::
Not implemented yet.
"""
pass
[docs]
def import_lifespan_section(self, filename_suffix='lif'):
"""
Imports LIFESPAN section from a Forest model.
.. warning::
Not implemented yet.
"""
pass
[docs]
def import_schedule_section(self,
filename_suffix='seq',
replace_commas=True,
filename_prefix=None,
convert_periods_to_years=None):
"""
Imports SCHEDULE section from a Forest model.
"""
filename_prefix = self.model_name if filename_prefix is None else filename_prefix
age_multiplier = self._resolve_period_multiplier(convert_periods_to_years)
schedule = []
n = self.nthemes()
with open('%s/%s.%s' % (self.model_path, filename_prefix, filename_suffix)) as f:
for lnum, l in enumerate(f):
if re.match(r'^\s*(;|$)', l): continue # skip comments and blank lines
l = l.lower().strip().partition(';')[0].strip() # strip leading whitespace and trailing comments
t = re.split(r'\s+', l)
if len(t) != n + 5: break
dtype_key = tuple(t[:n])
age = int(t[n]) * age_multiplier
area = float(t[n+1].replace(',', '')) if replace_commas else float(t[n+1])
acode = t[n+2]
period = int(t[n+3])
etype = t[n+4] if len(t) >= n+4 else ''
schedule.append((dtype_key, age, area, acode, period, etype))
if area <= 0: print('area <= 0', l)
return schedule
[docs]
def compile_schedule(self, problem=None):
"""
Compiles the schedule of actions.
If a problem is specified, compiles the schedule from the given problem data.
Otherwise, compiles the schedule from the data in `self.applied_actions`.
"""
if problem is not None:
return self._compile_schedule_from_problem(problem)
else: # use data in self.applied_actions
return self._compile_schedule_from_actions()
def _compile_schedule_from_actions(self):
result = []
for period in self.periods:
aa = self.applied_actions[period]
for acode in aa.keys():
for dtk in aa[acode].keys():
etype = '_existing' if self.dt(dtk).area(0) else '_future'
for age in aa[acode][dtk].keys():
area = aa[acode][dtk][age][0]
result.append((dtk, age, area, acode, period, etype))
return result
[docs]
def apply_schedule(self, schedule, max_period=None, verbose=False,
fail_on_missingarea=False, force_integral_area=False,
override_operability=False, fuzzy_age=True,
recourse_enabled=True, areaselector=None,
compile_t_ycomps=False, compile_c_ycomps=False,
rounding_bias=0.15, scale_area=None, reset=True):
"""
Assumes schedule in format returned by import_schedule_section().
That is: list of (dtype_key, age, area, acode, period, etype) tuples.
Also assumes that actions in list are sorted by applied period.
:param list schedule: The schedule of actions to apply.
:param int max_period: The maximum period to apply actions for. If None, defaults to the horizon.
:param bool verbose: If True, prints additional information for debugging purposes. Default is False.
:param bool fail_on_missingarea: If True, raises an exception if missing area is encountered. Default is False.
:param bool force_integral_area: If True, forces the area to be integral. Default is False.
:param bool override_operability: If True, overrides operability limits. Default is False.
:param bool fuzzy_age: If True, attempts to apply action to proximal age class if specified age is not operable.
Default is True.
:param bool recourse_enabled: If True, uses default AreaSelector to patch missing area. Default is True.
:param object areaselector: The AreaSelector object to use for patching missing area. Default is None.
:param bool compile_t_ycomps: If True, compiles time-indexed yield components. Default is False.
:param bool compile_c_ycomps: If True, compiles complex yield components. Default is False.
:param float rounding_bias: The rounding bias to use when forcing integral area. Default is 0.15.
:param scale_area: The scaling factor to apply to the area. Default is None.
:param bool reset: If True, resets the model before applying the schedule. Default is True.
:return: The missing area (float) after applying the schedule.
"""
if max_period is None: max_period = self.horizon
if reset: self.reset()
missing_area = 0.
for _period in self.periods:
for dtype_key, age, area, acode, period, etype in schedule:
if period != _period: continue
if scale_area: area = area * scale_area
if period > _period:
if verbose: print('apply_schedule: committing actions for period', _period, '(missing area %0.1f)' % missing_area)
self.commit_actions(_period)
if period > max_period: return
if force_integral_area:
area = round(area+rounding_bias)
if not area: continue
assert not area % 1.
e, _aa, _ = self.apply_action(dtype_key,
acode,
period,
age,
area,
override_operability=override_operability,
fuzzy_age=fuzzy_age,
recourse_enabled=recourse_enabled,
areaselector=areaselector,
compile_t_ycomps=compile_t_ycomps,
compile_c_ycomps=compile_c_ycomps,
verbose=verbose)
crash_on_apply_action_error = False # hack... put in method signature later
if crash_on_apply_action_error:
assert not e # crash on error (TO DO: better error handling)
else:
if e:
print('apply action error', e, dtype_key, acode, period, age, area)
if isinstance(_aa, float):
if fail_on_missingarea and missing_area:
raise
else:
missing_area += _aa
if verbose: print('missing area %0.1f (%0.2f)' % (_aa, _aa/area))
_period = period
self.commit_actions(_period)
return missing_area
[docs]
def import_control_section(self, filename_suffix='run'):
"""
Imports CONTROL section from a Forest model.
.. warning::
Not implemented yet.
"""
pass
[docs]
def grow(self, start_period=1, cascade=True):
"""
Simulates growth (default startint at period 1 and cascading to the end of the planning horizon).
"""
for dt in list(self.dtypes.values()): dt.grow(start_period, cascade)
def _cbm_sit_classifiers(self):
"""
Compile sit_classifiers dataframe.
"""
data = {'classifier_id':[], 'name':[], 'description':[]}
for i, theme in enumerate(self._themes):
data['classifier_id'].append(i+1)
data['name'].append('_CLASSIFIER')
data['description'].append(theme['__name__'])
for v in self.theme_basecodes(i):
data['classifier_id'].append(i+1)
data['name'].append(v)
data['description'].append(v) # not great descriptions, but no effect on CBM output
data['classifier_id'].append(self.nthemes()+1)
data['name'].append('_CLASSIFIER')
data['description'].append('species')
data['classifier_id'].append(self.nthemes()+1)
data['name'].append('softwood')
data['description'].append('softwood')
data['classifier_id'].append(self.nthemes()+1)
data['name'].append('hardwood')
data['description'].append('hardwood')
result = pd.DataFrame(data)
return result
def _cbm_sit_disturbance_types(self):
"""
Compile sit_disturbance_types dataframe.
"""
acodes = [acode for acode in self.actions.keys() if acode != 'null'] + ['fire']
data = {'id':acodes, 'name':acodes}
result = pd.DataFrame(data)
return result
def _cbm_sit_age_classes(self):
"""
Compile sit_age_classes dataframe.
"""
data = {'name':['age_0'],
'class_size':[0],
'start_year':[0],
'end_year':[0]}
for i, ac in enumerate(range(self.period_length,
self.max_age+self.period_length,
self.period_length)):
data['name'].append('age_%i' % (i+1))
data['class_size'].append(self.period_length)
data['start_year'].append(ac - self.period_length + 1)
data['end_year'].append(ac)
result = pd.DataFrame(data)
return result
def _cbm_sit_inventory(self, softwood_volume_yname, hardwood_volume_yname, default_last_pass_disturbance='fire',
include_empty_dtypes=False):
"""
Compile sit_inventory dataframe.
"""
def leading_species(r):
"""
Determine if softwood or hardwood leading species by comparing softwood and hardwood
volume at peak MAI age.
"""
dt = self.dtypes[tuple(r.tolist())]
svol_curve, hvol_curve = dt.ycomp(softwood_volume_yname), dt.ycomp(hardwood_volume_yname)
tvol_curve = svol_curve + hvol_curve
x_cmai = tvol_curve.mai().ytp().lookup(0)
return 'softwood' if svol_curve[x_cmai] > hvol_curve[x_cmai] else 'hardwood'
def landclass(r):
"""
The landclass column values should contain an integer in the range [0, 22], which CBM
maps to one of 23 UNFCCC land classes (see Table 3-1 in the the CBM-CFS3 user guide.
Use the value of the 'landclass' attribute of the corresponding development type (if defined),
otherwise default to 0 ('forest land remaining forest land').
"""
dt = self.dtypes[tuple(r.tolist())]
if hasattr(dt, 'landclass'):
return dt.landclass
else:
return '0'
def last_pass_disturbance(r):
"""
We use the value of the 'last_pass_disturbance' attribute of the corresponding development
type (if defined), otherwise default to default_last_pass_disturbance.
"""
dt = self.dtypes[tuple(r.tolist())]
if hasattr(dt, 'last_pass_disturbance'):
return dt.last_pass_disturbance
else:
return default_last_pass_disturbance
theme_cols = [theme['__name__'] for theme in self._themes]
other_cols = ['species',
'using_age_class',
'age',
'area',
'delay',
'landclass',
'historic_disturbance',
'last_pass_disturbance']
data = {**{c:[] for c in theme_cols}, **{c:[] for c in ['age', 'area']}}
for dtype_key in self.dtypes:
dt = self.dtypes[dtype_key]
if include_empty_dtypes:
if not dt._areas[0]: dt._areas[0] = {0:0.} # force a non-null initial inventory
else:
if not dt._areas[0]: continue # developement type not in initial inventory
for age, area in dt._areas[0].items():
for i, c in enumerate(theme_cols): data[c].append(dtype_key[i])
data['age'].append(age)
data['area'].append(area)
result = pd.DataFrame(data)
age_series, area_series = result['age'], result['area']
result.drop(['age', 'area'], axis=1, inplace=True) # wrong column order
result['species'] = result[theme_cols].apply(leading_species, axis=1)
result['using_age_class'] = 'FALSE'
result['age'] = age_series
result['area'] = area_series
result['delay'] = 0
result['landclass'] = result[theme_cols].apply(landclass, axis=1)
result['historic_disturbance'] = 'fire'
result['last_pass_disturbance'] = result[theme_cols].apply(last_pass_disturbance, axis=1)
return result
def _cbm_sit_yield(self, softwood_volume_yname, hardwood_volume_yname, n_yield_vals):
"""
Compile sit_yields dataframe.
This is where the ws3 and CBM data models diverge and things get a bit messy.
The hack:
Define a bogus Model I optimization problem in ws3.
A side effect of generating the Model I problem matrix is to dynamically create
all possible DevelopmentType cases and add them to the ForestModel.
Then we can can use ForestModel.unmask to get at least one valid
DevelopmentType instance for each yield mask in ForestModel.yields
(there could be multiple, but just grap the first one in the list).
From there, use the same species-grokking logic we used to compile
the species column in sit_inventory. Ugly, but should work
as long as ForestModel.yields includes full-wildcard softwood and
hardwood complex yield curves defined with the _SUM(...) function (because
this way all DevelopmentType instances, initially present or dynamically
created, will automatically have softwood and hardwood yield curves defined).
The bogus Model I problem can just use a bogus z coefficient
function (that alwasy returns 0) and no flow or general constraints.
Maybe there is a better way?
"""
def leading_species(dt):
"""
Determine if softwood or hardwood leading species by comparing softwood and hardwood
volume at peak MAI age.
"""
svol_curve, hvol_curve = dt.ycomp(softwood_volume_yname), dt.ycomp(hardwood_volume_yname)
tvol_curve = svol_curve + hvol_curve
x_cmai = tvol_curve.mai().ytp().lookup(0)
return 'softwood' if svol_curve[x_cmai] > hvol_curve[x_cmai] else 'hardwood'
schedule = self.compile_schedule()
p = self.add_problem('__cbm_sit_bogus', {'z':(lambda forestmodel, path: 0.)})
theme_cols = [theme['__name__'] for theme in self._themes]
data = {**{c:[] for c in theme_cols},
'species':[], 'leading_species':[],
**{'v%i' % i:[] for i in range(n_yield_vals + 1)}}
for dtype_key in self.dtypes:
dt = self.dt(dtype_key)
dt.leading_species = leading_species(dt)
for species, yname in zip(('softwood', 'hardwood'), (softwood_volume_yname, hardwood_volume_yname)):
for i, c in enumerate(theme_cols): data[c].append(dtype_key[i])
data['species'].append(species)
data['leading_species'].append(dt.leading_species)
for i in range(n_yield_vals + 1): data['v%i' % i].append(dt.ycomp(yname)[i * self.period_length])
result = pd.DataFrame(data)
self.apply_schedule(schedule) # running add_problem above broke the schedule so restore from backup we stashed
return result
def _cbm_sit_events(self):
theme_cols = [theme['__name__'] for theme in self._themes]
columns = theme_cols.copy()
columns += ['species',
'using_age_class',
'min_softwood_age',
'max_softwood_age',
'min_hardwood_age',
'max_hardwood_age',
'MinYearsSinceDist',
'MaxYearsSinceDist',
'LastDistTypeID',
'MinTotBiomassC',
'MaxTotBiomassC',
'MinSWMerchBiomassC',
'MaxSWMerchBiomassC',
'MinHWMerchBiomassC',
'MaxHWMerchBiomassC',
'MinTotalStemSnagC',
'MaxTotalStemSnagC',
'MinSWStemSnagC',
'MaxSWStemSnagC',
'MinHWStemSnagC',
'MaxHWStemSnagC',
'MinTotalStemSnagMerchC',
'MaxTotalStemSnagMerchC',
'MinSWMerchStemSnagC',
'MaxSWMerchStemSnagC',
'MinHWMerchStemSnagC',
'MaxHWMerchStemSnagC',
'efficiency',
'sort_type',
'target_type',
'target',
'disturbance_type',
'disturbance_year']
data = {c:[] for c in columns}
for dtype_key, age, area, acode, period, _ in self.compile_schedule():
#set_trace()
for i, c in enumerate(theme_cols): data[c].append(dtype_key[i])
data['species'].append(self.dt(dtype_key).leading_species)
data['using_age_class'].append('FALSE')
#############################################################################
# might need to be more flexible with age range, to avoid OBO bugs and such?
data['min_softwood_age'].append(-1)
data['max_softwood_age'].append(-1)
data['min_hardwood_age'].append(-1)
data['max_hardwood_age'].append(-1)
# data['min_softwood_age'].append(age)
# data['max_softwood_age'].append(age)
# data['min_hardwood_age'].append(age)
# data['max_hardwood_age'].append(age)
#############################################################################
for c in columns[len(theme_cols)+6:-6]: data[c].append(-1)
data['efficiency'].append(1)
data['sort_type'].append(3) # oldest first (see Table 3-3 in the CBM-CFS3 user guide)
data['target_type'].append('A') # area target
data['target'].append(area)
data['disturbance_type'].append(acode)
data['disturbance_year'].append(period*self.period_length)
result = pd.DataFrame(data)
return result
def _cbm_sit_transitions(self, null_acode='null'):
def resolve_target(dtype_key, target, sage):
tmask, tprop, tyield, tage, tlock, treplace, tappend = target # unpack tuple
dtk = list(dtype_key) # start with source key
dtk = [t if tmask[i] == '?' else tmask[i] for i, t in enumerate(dtk)]
if treplace: dtk[treplace[0]] = self.resolve_replace(dtk, treplace[1])
if tappend: dtk[tappend[0]] = self.resolve_append(dtk, tappend[1])
dtk = tuple(dtk)
targetage = self.resolve_targetage(dtk, tyield, sage, tage, acode)
return dtk, targetage
theme_cols = colunmns = [theme['__name__'] for theme in self._themes]
columns = theme_cols.copy()
columns += ['species',
'using_age_class',
'min_softwood_age',
'max_softwood_age',
'min_hardwood_age',
'max_hardwood_age',
'disturbance_type']
columns += ['to_%s' % c for c in theme_cols]
columns += ['to_species',
'regen_delay',
'reset_age',
'percent']
data = {c:[] for c in columns}
for dtype_key in self.dtypes:
dt = self.dt(dtype_key)
for acode, sage in dt.transitions:
if acode == null_acode: continue
for target in dt.transitions[acode, sage]:
for i, c in enumerate(theme_cols): data[c].append(dtype_key[i])
data['species'].append(dt.leading_species)
data['using_age_class'].append('FALSE')
data['min_softwood_age'].append(sage)
data['max_softwood_age'].append(sage)
data['min_hardwood_age'].append(sage)
data['max_hardwood_age'].append(sage)
data['disturbance_type'].append(acode)
to_dtype_key, to_age = resolve_target(dtype_key, target, sage)
for i in range(len(theme_cols)): data['to_theme%i' % i].append(to_dtype_key[i]) # Monkey patch
target_dt = self.dt(to_dtype_key)
target_species = target_dt.leading_species if target_dt else dt.leading_species
data['to_species'].append(target_species)
data['regen_delay'].append(0)
data['reset_age'].append(to_age)
data['percent'].append(int(target[1] * 100))
result = pd.DataFrame(data)
return result
# --- orphaneed code? delete? ---
# for acode in fm.transitions:
# if acode == null_acode: continue
# for smask in self.transitions[acode]:
# tmask, tprop, _, _, _, _, _ = self.transitions[acode][smask][''][0]
# for i, c in enumerate(theme_cols): data[c].append(smask[i])
# data['species'].append('softwood' if au_table1.loc[int(smask[2])].canfi_species < 1200 else 'hardwood')
# data['using_age_class'].append('FALSE')
# data['min_softwood_age'].append(1)
# data['max_softwood_age'].append(999)
# data['min_hardwood_age'].append(1)
# data['max_hardwood_age'].append(999)
# data['disturbance_type'].append('harvest')
# for i in range(len(theme_cols)): data['to_theme%i' % i].append(tmask[i])
# data['to_%s' % species_classifier_colname].append('softwood' if au_table2.loc[int(tmask[4])].canfi_species < 1200 else 'hardwood')
# data['regen_delay'].append(0)
# data['reset_age'].append(0)
# data['percent'].append(100)
# result = pd.DataFrame(data)
# return result
[docs]
def to_cbm_sit(self, softwood_volume_yname, hardwood_volume_yname, admin_boundary, eco_boundary,
disturbance_type_mapping,
export_csv=False, sit_data_path='', default_last_pass_disturbance='fire', n_yield_vals=100,
include_empty_dtypes=False):
"""
Exports model data in a CBM standard import tool (SIT) data exchange format.
Returns sit_config (JSON-like dict namespace) and sit_tables (dict of pandas.DataFrame objects).
:param str softwood_volume_yname: The yield component name for softwood volume.
:param str hardwood_volume_yname: The yield component name for hardwood volume.
:param str admin_boundary: The administrative boundary for spatial units mapping.
:param str eco_boundary: The ecological boundary for spatial units mapping.
:param dict disturbance_type_mapping: A dictionary containing disturbance type mapping information.
:param bool export_csv: Flag indicating whether to export data to CSV files. Default is False.
:param str sit_data_path: The path to export CSV files. Default is empty string.
:param str default_last_pass_disturbance: The default last pass disturbance type. Default is 'fire'.
:param int n_yield_vals: The number of yield values. Default is 100.
"""
sit_config = {
'mapping_config': {
'nonforest': None,
'species': {
'species_classifier': 'species',
'species_mapping': [
{'user_species': 'softwood', 'default_species': 'Softwood forest type'},
{'user_species': 'hardwood', 'default_species': 'Hardwood forest type'}
]
},
'spatial_units': {
'mapping_mode': 'SingleDefaultSpatialUnit',
'admin_boundary': admin_boundary,
'eco_boundary': eco_boundary},
'disturbance_types': {
'disturbance_type_mapping': disturbance_type_mapping
}
}
}
sit_yield = self._cbm_sit_yield(softwood_volume_yname='swdvol',
hardwood_volume_yname='hwdvol', n_yield_vals=100)
sit_tables = {'sit_classifiers':self._cbm_sit_classifiers(),
'sit_disturbance_types':self._cbm_sit_disturbance_types(),
'sit_age_classes':self._cbm_sit_age_classes(),
'sit_yield':sit_yield,
'sit_inventory':self._cbm_sit_inventory(softwood_volume_yname='swdvol',
hardwood_volume_yname='hwdvol',
include_empty_dtypes=include_empty_dtypes),
'sit_events':self._cbm_sit_events(),
'sit_transitions':self._cbm_sit_transitions()}
return sit_config, sit_tables
if __name__ == '__main__':
pass