const { BaseKonnector, log, addData, hydrateAndFilter, errors, cozyClient, } = require('cozy-konnector-libs') const getAccountId = require('./helpers/getAccountId') const moment = require('moment') const rp = require('request-promise') require('moment-timezone') moment.locale('fr') // set the language moment.tz.setDefault('Europe/Paris') // set the timezone /*** Connector Constants ***/ const manualExecution = process.env.COZY_JOB_MANUAL_EXECUTION === 'true' ? true : false const startDailyDate = manualExecution ? moment().subtract(12, 'month') : moment().subtract(32, 'month') const startDailyDateString = startDailyDate.format('YYYY-MM-DD') const startLoadDate = moment().subtract(7, 'day') const startLoadDateString = startLoadDate.format('YYYY-MM-DD') const checkHistoryDate = moment().subtract(8, 'day') const endDate = moment() const endDateString = endDate.format('YYYY-MM-DD') const baseUrl = 'https://gw.prd.api.enedis.fr' const dailyDataURL = `${baseUrl}/v4/metering_data/daily_consumption` const loadCurveURL = `${baseUrl}/v4/metering_data/consumption_load_curve` /** * The start function is run by the BaseKonnector instance only when it got all the account * information (fields). When you run this connector yourself in "standalone" mode or "dev" mode, * the account information come from ./konnector-dev-config.json file * cozyParameters are static parameters, independents from the account. Most often, it can be a * secret api key. * @param {Object} fields * @param {string} fields.access_token - access token * @param {string} fields.refresh_token - refresh token * @param {Object} cozyParameters - cozy parameters * @param {boolean} doRetry - whether we should use the refresh token or not */ async function start(fields, cozyParameters, doRetry = true) { log('info', 'Starting the enedis konnector') log('info', `Manual execution: ${manualExecution}`) const accountId = getAccountId() let usage_point_id = '' try { const { access_token } = fields if ( this._account && this._account.oauth_callback_results && this._account.oauth_callback_results.usage_points_id ) { const usage_points_id = this._account.oauth_callback_results.usage_points_id.split( ',' ) usage_point_id = usage_points_id[0] } else if (fields.usage_point_id) { // In case of refresh token, we retrieve the usage point id from the fields usage_point_id = fields.usage_point_id } else { log('error', 'no usage_point_id found') throw errors.USER_ACTION_NEEDED_OAUTH_OUTDATED } log('info', 'Fetching enedis daily data') const fetchedDailyData = await getDailyData(access_token, usage_point_id) log('info', 'Process enedis daily data') const processedDailyData = await processData( fetchedDailyData, 'com.grandlyon.enedis.day', ['year', 'month', 'day'] ) log('info', 'Agregate enedis daily data for month and year') await agregateMonthAndYearData(processedDailyData) log('info', 'Process enedis load data') await startLoadDataProcess(access_token, usage_point_id) } catch (err) { if (err.statusCode === 403 || err.code === 403) { if (!fields.refresh_token) { log('info', 'no refresh token found') throw errors.USER_ACTION_NEEDED_OAUTH_OUTDATED } else if (doRetry) { log('info', 'asking refresh from the stack') let body try { body = await cozyClient.fetchJSON( 'POST', `/accounts/enedisgrandlyon/${accountId}/refresh` ) } catch (err) { log('info', `Error during refresh ${err.message}`) throw errors.USER_ACTION_NEEDED_OAUTH_OUTDATED } log('info', 'refresh response') log('info', JSON.stringify(body)) fields.access_token = body.attributes.oauth.access_token fields.usage_point_id = usage_point_id return start(fields, cozyParameters, false) } log('error', `Error during authentication: ${err.message}`) throw errors.LOGIN_FAILED } else { log('error', 'caught an unexpected error') log('error', err.message) throw err } } } /** * Retrieve data from the API * Format: { value: "Wh", "date": "YYYY-MM-DD" } */ async function getDailyData(token, usagePointID) { const dataRequest = { method: 'GET', uri: dailyDataURL + '?start=' + startDailyDateString + '&end=' + endDateString + '&usage_point_id=' + usagePointID, headers: { Accept: 'application/json', Authorization: 'Bearer ' + token, }, } const response = await rp(dataRequest) return response } /** * Check if history is loaded * If not, call several time the api to retrieve 1 month of history for load data * If yes only call once the api */ async function startLoadDataProcess(token, usagePointID) { log('info', 'Check consent for user') const isConsent = await checkConsentForLoadCurve( token, usagePointID, startLoadDateString, endDateString ) if (isConsent) { log('info', 'Check history') const isHistory = await isHistoryLoaded('com.grandlyon.enedis.minute') log('info', `isHistory: ${isHistory}`) if (isHistory || manualExecution) { log('info', 'launch process without history') await launchLoadDataProcess( token, usagePointID, startLoadDateString, endDateString ) } else { log('info', 'launch process with history') for (var i = 0; i < 4; i++) { const increamentedStartDate = moment(startLoadDate) const incrementedEndDate = moment(endDate) const increamentedStartDateString = increamentedStartDate .subtract(7 * i, 'day') .format('YYYY-MM-DD') const incrementedEndDateString = incrementedEndDate .subtract(7 * i, 'day') .format('YYYY-MM-DD') await launchLoadDataProcess( token, usagePointID, increamentedStartDateString, incrementedEndDateString ) } } } } /** * Request API and check return code * Return true or false */ async function checkConsentForLoadCurve( token, usagePointID, _startDate, _endDate ) { const dataRequest = { method: 'GET', uri: loadCurveURL + '?start=' + _startDate + '&end=' + _endDate + '&usage_point_id=' + usagePointID, headers: { Accept: 'application/json', Authorization: 'Bearer ' + token, }, } try { await rp(dataRequest) log('info', 'Consent found for load curve') return true } catch (err) { if ( (err.statusCode === 400 || err.code === 400) && err.message.search('ADAM-ERR0075') > 0 ) { log('info', 'No consent for load curve') return false } else if (err.statusCode === 403 || err.code === 403) { log('info', 'No consent for load curve') return false } else if ( (err.statusCode === 404 || err.code === 404) && err.message.search('no_data_found') > 0 ) { log('info', 'Handling half-hour error on connection') return false } else { throw err } } } /** * Function checking if the history is loaded */ async function isHistoryLoaded(doctype) { log('debug', doctype, 'Retrieve data') const result = await cozyClient.data.findAll(doctype) if (result && result.length > 0) { const filtered = result.filter(function(el) { const elDate = moment({ year: el.year, month: el.month, day: el.day, minute: el.minute, }) return elDate.isBefore(checkHistoryDate) }) if (filtered.length > 0) { return true } else { return false } } return false } /** * Launch process to handle load data */ async function launchLoadDataProcess( token, usagePointID, _startLoadDate, _endDate ) { log('info', 'Fetching enedis load data') const fetchedLoadData = await getLoadData( token, usagePointID, _startLoadDate, _endDate ) if (fetchedLoadData && fetchedLoadData.length > 0) { log('info', 'Process enedis load data') await processData(fetchedLoadData, 'com.grandlyon.enedis.minute', [ 'year', 'month', 'day', 'hour', 'minute', ]) } else { log('info', 'No consent or data for load curve') } } /** * Retrieve data from the API * Format: { value: "W", "date": "YYYY-MM-DD hh:mm:ss" } */ async function getLoadData(token, usagePointID, _startDate, _endDate) { const dataRequest = { method: 'GET', uri: loadCurveURL + '?start=' + _startDate + '&end=' + _endDate + '&usage_point_id=' + usagePointID, headers: { Accept: 'application/json', Authorization: 'Bearer ' + token, }, } const response = await rp(dataRequest) return response } /** * Parse data * Remove existing data from DB using hydrateAndFilter * Store filtered data * Return the list of filtered data */ async function processData(data, doctype, filterKeys) { const parsedData = JSON.parse(data) const intervalData = parsedData.meter_reading.interval_reading const formatedData = await formateData(intervalData, doctype) // Remove data for existing days into the DB const filteredData = await hydrateAndFilter(formatedData, doctype, { keys: filterKeys, }) // Store new day data await storeData(filteredData, doctype, filterKeys) return filteredData } /** * Agregate data from daily data to monthly and yearly data */ async function agregateMonthAndYearData(data) { // Sum year and month values into object with year or year-month as keys if (data && data.length > 0) { let monthData = {} let yearData = {} data.forEach(element => { element.year + '-' + element.month in monthData ? (monthData[element.year + '-' + element.month] += element.load) : (monthData[element.year + '-' + element.month] = element.load) element.year in yearData ? (yearData[element.year] += element.load) : (yearData[element.year] = element.load) }) // Agregation for Month data const agregatedMonthData = await buildAgregatedData( monthData, 'com.grandlyon.enedis.month' ) await storeData(agregatedMonthData, 'com.grandlyon.enedis.month', [ 'year', 'month', ]) // Agregation for Year data const agregatedYearData = await buildAgregatedData( yearData, 'com.grandlyon.enedis.year' ) await storeData(agregatedYearData, 'com.grandlyon.enedis.year', ['year']) } } /** * Save data in the right doctype db and prevent duplicated keys */ async function storeData(data, doctype, filterKeys) { log('debug', doctype, 'Store into') const filteredDocuments = await hydrateAndFilter(data, doctype, { keys: filterKeys, }) return await addData(filteredDocuments, doctype) } /** * Format data for DB storage * Remove bad data */ async function formateData(data, doctype) { log('info', 'Formating data') return data.map(record => { let date = moment(record.date, 'YYYY/MM/DD h:mm:ss') if (record.value != -2) { const load = doctype === 'com.grandlyon.enedis.minute' ? record.value / 2 : record.value if (doctype === 'com.grandlyon.enedis.minute') { date = date.subtract(30, 'minute') } return { load: parseFloat(load / 1000), year: parseInt(date.format('YYYY')), month: parseInt(date.format('M')), day: parseInt(date.format('D')), hour: parseInt(date.format('H')), minute: parseInt(date.format('m')), } } }) } /** * Retrieve and remove old data for a specific doctype * Return an Array of agregated data */ async function buildAgregatedData(data, doctype) { let agregatedData = [] for (let [key, value] of Object.entries(data)) { const data = await buildDataFromKey(doctype, key, value) const oldValue = await resetInProgressAggregatedData(data, doctype) data.load += oldValue agregatedData.push(data) } return agregatedData } /** * Format an entry for DB storage * using key and value * For year doctype: key = "YYYY" * For month doctype: key = "YYYY-MM" */ async function buildDataFromKey(doctype, key, value) { let year, month, day, hour if (doctype === 'com.grandlyon.enedis.year') { year = key month = 1 day = 0 hour = 0 } else if (doctype === 'com.grandlyon.enedis.month') { const split = key.split('-') year = split[0] month = split[1] day = 0 hour = 0 } else { const split = key.split('-') year = split[0] month = split[1] day = split[2] hour = split[3] } return { load: Math.round(value * 10000) / 10000, year: parseInt(year), month: parseInt(month), day: parseInt(day), hour: parseInt(hour), minute: 0, } } /** * Function handling special case. * The temporary aggregated data need to be remove in order for the most recent one te be saved. * ex for com.grandlyon.enedis.year : * { load: 76.712, year: 2020, ... } need to be replace by * { load: 82.212, year: 2020, ... } after enedis data reprocess */ async function resetInProgressAggregatedData(data, doctype) { // /!\ Warning: cannot use mongo queries because not supported for dev by cozy-konnectors-libs log('debug', doctype, 'Remove aggregated data for') const result = await cozyClient.data.findAll(doctype) if (result && result.length > 0) { // Filter data to remove var filtered = [] if (doctype === 'com.grandlyon.enedis.year') { // Yearly case filtered = result.filter(function(el) { return el.year == data.year }) } else if (doctype === 'com.grandlyon.enedis.month') { // Monthly case filtered = result.filter(function(el) { return el.year == data.year && el.month == data.month }) } else { // Hourly case filtered = result.filter(function(el) { return ( el.year == data.year && el.month == data.month && el.day == data.day && el.hour == data.hour ) }) } // Remove data let sum = 0.0 for (const doc of filtered) { sum += doc.load log('debug', doc, 'Removing this entry for ' + doctype) await cozyClient.data.delete(doctype, doc) } return sum } return 0.0 } module.exports = new BaseKonnector(start)