Source code for recharge.time_series_manager

# ===============================================================================
# Copyright 2016 dgketchum
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import os
from datetime import datetime, timedelta

from numpy import array, nan, loadtxt, append, zeros
from pandas import DataFrame, notnull, to_numeric, concat

from app.paths import paths


[docs]def load_df(path): """ :param path: path to csv file :returns dataframe of etrm time series input """ print 'reading in csv: {}'.format(path) csv = loadtxt(path, dtype=str, delimiter=',') # extracts should have headers csv = csv[1:] try: new_ind = [datetime.strptime(row[0], '%Y-%m-%d') for row in csv] except ValueError: new_ind = [datetime.strptime(row[0], '%Y/%m/%d') for row in csv[1:]] arr = array(csv[:, 1:], dtype=float) cols = ['kcb', 'rg', 'etrs', 'min_temp', 'max_temp', 'temp', 'precip'] df = DataFrame(arr, index=new_ind, columns=cols) return df
[docs]def get_etrm_time_series(input_root, dict_): """ Read pre-extracted data out of a formatted csv. Use recharge.point_extract_utility.py to do extract. :param input_root: path to a folder of csv files. :param dict_: dict of point locations csv will be in the following format ['kcb', 'rg', 'etrs', 'min_temp', 'max_temp', 'temp', 'precip'] """ csv_list = [filename for filename in os.listdir(input_root) if filename.endswith('.csv')] print 'etrm extract csv list: {}'.format(csv_list) for path in csv_list: name = os.path.splitext(path)[0] df = load_df(os.path.join(input_root, path)) if dict_: for key, val in dict_.iteritems(): if val['Name'] == name: print 'updating {} number {} with etrm inputs df'.format(name, key) # print 'your df: \n{}'.format(df) dict_[key]['etrm'] = df
[docs]def amf_obs_time_series(dict_, save_cleaned_data_path=False, complete_days_only=False, close_threshold=0.20, return_low_err=False): """ Analyze, clean and return dict of AmeriFlux sites and relevant data. :param dict_: dict object of Ameriflux IDs as keys, nested dict with 'Name', 'Coords' etc :param path: string path to AmeriFlux folder containing AmeriFlux csv files :param return_low_err: retun dataframe of only low energy balance closure error :param close_threshold: threshold of error tolerated :param complete_days_only: return only full days of valid data :param save_cleaned_data_path: path to save location of output # (year, dtime, H, LE, FG, RN, RG, RGin, RGout) # H = sensible heat flux # LE = latent heat flux # FG = soil heat flux # RN = net radiation # RG = incoming shortwave # RGout = outgoing shortwave # RGL = incoming longwave # RGLout = outgoing longwave """ path = paths.amf_sites # this doesn't work def day_fraction_to_hr_min(fractional_day, year): """ :param fractional_day: 100.134 :param year: :return: """ def ext(d, scalar): d = str(d) a, b = d.split('.') aa, bb = int(a), float('.{}'.format(b)) return aa, bb * scalar dec = str(fractional_day) day, timepart = ext(dec, 24) hour, minpart = ext(timepart, 60) minutes = int(minpart) # day_part_str = '.{}'.format(dec_split[1]) # day, day_part_flt = int(dec_split[0]), float(day_part_str) # hour_dec = day_part_flt * 24 # hour_split = str(hour_dec).split('.') # hour, hour_part = int(hour_split[0]), float('.{}'.format(hour_split[1])) # min_part = str(hour_part * 60).split('.') # if minutes == 29: minutes = 30 elif minutes == 59: minutes = 0 tup = datetime(year, 1, 1) + timedelta(days=day, hours=hour, minutes=minutes) return tup arr_cols = [0, 2, 12, 14, 28, 30, 33, 34, 35] subset = ['year', 'day', 'H', 'LE', 'RN', 'RG', 'RGout', 'RGL', 'RGLout'] # ncols = len(arr_cols) columns = ['H', 'LE', 'RN', 'RG', 'RGout', 'RGL', 'RGLout'] for key, val in dict_.iteritems(): # Changes here amf_name = val['Name'] path = os.path.join('/Users/dcadol/Documents/ResearchProjects/FocusedRechargeModel/', 'ETRM_inputs_Ameriflux','ameriflux_sites') folder = os.path.join(path,'AMF_Data',amf_name) folder_contents = os.listdir(folder) print "this is the folder contents: {}".format(folder_contents) csv_list = [os.path.join(folder, item) for item in folder_contents] print "this is the new list: {}".format(csv_list) # amf_data = array([]).reshape(0, ncols) print 'attempting to fetch headers: {}'.format(subset) amf_data = None for item in csv_list: p = os.path.join(folder, item) # if i == 0: # col_check = loadtxt(p, dtype=str, skiprows=0, delimiter=',', usecols=arr_cols) # print 'headers being read: \n {}'.format(col_check[:1, :]) csv = loadtxt(p, dtype=str, skiprows=3, delimiter=',', usecols=arr_cols) if amf_data is None: amf_data = csv else: amf_data = append(amf_data, csv, axis=0) new_ind = [day_fraction_to_hr_min(float(row[1]), int(row[0])) for row in amf_data] amf_data = amf_data[:, 2:] df = DataFrame(amf_data, index=new_ind, columns=columns) print 'You have {} rows of --RAW-- data from {}'.format(df.shape[0], amf_name) # drop rows with NA values in 'subset' df[df == '-9999'] = nan df = df[notnull(df)] df = df.apply(to_numeric) df.dropna(axis=0, how='any', inplace=True) print 'You have {} rows of FlUX data from {}'.format(df.shape[0], amf_name) # Find all complete days (48) records with no NULL values, if complete_days_only: df = df.groupby(lambda xx: xx.date()) print 'df grouped: {}'.format(df) df = df.aggregate(lambda xx: sum(xx) if len(xx) > 23 else nan) df.dropna(axis=0, how='any', inplace=True) # and convert energy to MJ for name, series in df.iteritems(): if name in columns: series *= 0.0864 / 48 # calculate energy balance error calculated_cols = ['rad_err', 'en_bal_err', 'rad_minus_sens_heat', 'amf_ET'] empty = zeros((df.shape[0], len(calculated_cols)), dtype=float) new_df = DataFrame(empty, index=df.index, columns=calculated_cols) df = concat([df, new_df], axis=1, join='outer') for ind, row in df.iterrows(): rn = row['RN'] leg = row['LE'] + row['H'] rn_leg = rn - leg row['rad_err'] = (abs(rn - (row['RG'] - row['RGout'] + row['RGL'] - row['RGLout'])) / rn) row['en_bal_err'] = rn_leg / rn row['rad_minus_sens_heat'] = rn_leg * 0.0864 / 48 row['amf_ET'] = (row['LE'] / 2.45) # convert from MJ/(step * m**2) to mm water df_low_err = df[df['en_bal_err'] <= close_threshold] print 'You have {} DAYS of CLEAN RN/LE/H/RAD data from {}'.format(df.shape[0], amf_name) print 'The mean energy balance closure error is: {}'.format(df['en_bal_err'].mean()) print 'You have {} DAYS of [0.0 < CLOSURE ERROR < {}] data from {}'.format(len(df_low_err), close_threshold, amf_name) if save_cleaned_data_path: p = os.path.join(save_cleaned_data_path, 'ameriflux_sites', 'AMF_ETRM_output', '{}_cleaned_all.csv'.format(amf_name)) df.to_csv(p) p = os.path.join(save_cleaned_data_path, '{}_cleaned_lowErr.csv'.format(amf_name)) df_low_err.to_csv(p) if return_low_err: val['AMF_Data'] = df_low_err else: val['AMF_Data'] = df_low_err return dict_
if __name__ == '__main__': pass # ============= EOF =============================================