// @ts-check require('./instrument.js') // Sentry initialization const { BaseKonnector, log, hydrateAndFilter, errors, updateOrCreate, } = require('cozy-konnector-libs') const soapRequest = require('easy-soap-request') const moment = require('moment') require('moment-timezone') const xml2js = require('xml2js') const { parseSgeXmlData, formateDataForDoctype, parseTags, parseValue, parseValueHalfHour, parsePointId, parseUserOffPeakHours, } = require('./helpers/parsing') const { consultationMesuresDetailleesMaxPower, consultationMesuresDetaillees, consulterDonneesTechniquesContractuelles, } = require('./requests/sge') const { updateBoConsent, createBoConsent, getBoConsent, deleteBoConsent, } = require('./requests/bo') const { verifyUserIdentity, activateContract, verifyContract, terminateContract, } = require('./core') const { getAccount, saveAccountData } = require('./requests/cozy') const { isLocal } = require('./helpers/env') const Sentry = require('@sentry/node') const { catchRequestReject } = require('./helpers/catch') const { applyPrices, getPrices } = require('./helpers/prices.js') const { rangeDate } = require('./constants.js') const { aggregateMonthlyLoad, filterFirstMonthlyLoad, aggregateYearlyLoad, filterFirstYearlyLoad, } = require('./helpers/aggregate.js') moment.locale('fr') // set the language moment.tz.setDefault('Europe/Paris') // set the timezone /** Connector Constants **/ const manualExecution = process.env.COZY_JOB_MANUAL_EXECUTION === 'true' let startDate = manualExecution ? moment().subtract(12, 'month').add(1, 'day') : moment().subtract(36, 'month').add(1, 'day') let startDateString = startDate.format('YYYY-MM-DD') const startHalfHourDate = moment().subtract(7, 'day') const endDate = moment() const endDateString = endDate.format('YYYY-MM-DD') const ACCOUNT_ID = isLocal() ? 'default_account_id' : 'enedissgegrandlyon' const NO_DATA = process.env.NO_DATA === 'true' module.exports = new BaseKonnector(start) /** * The start function is run by the BaseKonnector instance only when it got all the account * information (fields). When you run this connector yourself in "standalone" mode or "dev" mode, * the account information come from ./konnector-dev-config.json file * cozyParameters are static parameters, independents from the account. Most often, it can be a * secret api key. * @param {fields} fields * @param {{secret: fields}} cozyParameters */ async function start(fields, cozyParameters) { try { log('info', 'Konnector configuration ...') log('info', `isManual execution: ${manualExecution}`) if (NO_DATA) { log( 'debug', 'NO_DATA is enabled, konnector will stop after verifyUserIdentity()' ) } const pointId = parsePointId(parseInt(fields.pointId)) let baseUrl = fields.wso2BaseUrl let apiAuthKey = fields.apiToken let contractId = fields.contractId let sgeLogin = fields.sgeLogin let boToken = fields.boToken let boBaseUrl = fields.boBaseUrl if (cozyParameters && Object.keys(cozyParameters).length !== 0) { log('debug', 'Found COZY_PARAMETERS') baseUrl = cozyParameters.secret.wso2BaseUrl apiAuthKey = cozyParameters.secret.apiToken contractId = cozyParameters.secret.contractId sgeLogin = cozyParameters.secret.sgeLogin boBaseUrl = cozyParameters.secret.boBaseUrl boToken = cozyParameters.secret.boToken } // Prevent missing configuration if ( !baseUrl || !apiAuthKey || !contractId || !sgeLogin || !boToken || !boBaseUrl ) { const errorMessage = 'Missing configuration secrets' log('error', errorMessage) Sentry.captureException(errorMessage, { tags: { section: 'start' }, }) throw new Error(errors.VENDOR_DOWN) } /** * If it's first start we have to do the following operations: * - verify pdl are matching * - BO: create backoffice consent * - get contract start date and store it * - activate half-hour * - BO: update consent with service ID */ log('info', 'User Logging...') const boUrlSGE = new URL('/api/sge', boBaseUrl).href if (isFirstStart(await getAccount(ACCOUNT_ID))) { log('info', 'First start...') const user = await verifyUserIdentity( fields, baseUrl, apiAuthKey, sgeLogin ) exitIfDebug(user) let consent = await createBoConsent( boUrlSGE, boToken, pointId, user.lastname, user.firstname, user.address, user.postalCode, user.inseeCode, user.city, user.hasBeenThroughSafetyOnBoarding ) const contractStartDate = moment().format('YYYY-MM-DD') const contractEndDate = moment() .add(1, 'year') // SGE force 1 year duration .format('YYYY-MM-DD') let serviceId = await verifyContract( baseUrl, apiAuthKey, sgeLogin, contractId, user.pointId ) if (!serviceId) { serviceId = await activateContract( baseUrl, apiAuthKey, sgeLogin, contractId, user.lastname, user.pointId, contractStartDate, contractEndDate ).catch(async err => { await deleteBoConsent(boUrlSGE, boToken, consent.ID) throw err }) } consent = await updateBoConsent( boUrlSGE, boToken, consent, serviceId.toString() ) // Save bo id into account const accountData = await getAccount(ACCOUNT_ID) await saveAccountData(ACCOUNT_ID, { ...accountData.data, consentId: consent.ID, expirationDate: contractEndDate, inseeCode: user.inseeCode, }) } else { log('info', 'Alternate start...') const accountData = await getAccount(ACCOUNT_ID) const userConsent = await getBoConsent( boUrlSGE, boToken, accountData.data.consentId ) const user = await verifyUserIdentity( fields, baseUrl, apiAuthKey, sgeLogin, true, accountData.data.inseeCode ) exitIfDebug(user) if (!userConsent) { const errorMessage = 'No user consent found' log('error', errorMessage) Sentry.captureException(errorMessage, { tags: { section: 'start' }, }) throw new Error(errors.VENDOR_DOWN) } const consentEndDate = Date.parse(userConsent.endDate) const today = Date.now() if ( user.lastname.toLocaleUpperCase() !== userConsent.lastname.toLocaleUpperCase() || !user || consentEndDate < today ) { await deleteConsent( userConsent, baseUrl, apiAuthKey, sgeLogin, contractId, pointId, boUrlSGE, boToken, consentEndDate < today ) } } log('info', 'Successfully logged in') await gatherData(baseUrl, apiAuthKey, sgeLogin, pointId, boBaseUrl) log('info', 'Konnector success') } catch (error) { const errorMessage = `SGE konnector encountered an error. Response data: ${JSON.stringify( error.message )}` Sentry.captureMessage(errorMessage, { tags: { section: 'start', }, }) await Sentry.flush() throw error } } /** * Delete User Consent * @param {Consent} userConsent * @param {string} baseUrl * @param {string} apiAuthKey * @param {string} sgeLogin * @param {string} contractId * @param {string} pointId * @param {string} boBaseUrl * @param {string} boToken * @param {boolean} isConsentExpired */ async function deleteConsent( userConsent, baseUrl, apiAuthKey, sgeLogin, contractId, pointId, boBaseUrl, boToken, isConsentExpired ) { log('error', `Invalid or not found consent for user`) Sentry.captureMessage(`Invalid or not found consent for user`) if (userConsent.serviceID) { await terminateContract( baseUrl, apiAuthKey, sgeLogin, contractId, pointId, userConsent.serviceID ) await deleteBoConsent(boBaseUrl, boToken, userConsent.ID || 0) } else { const errorMessage = `No service id retrieved from BO` log('error', errorMessage) Sentry.captureException(errorMessage, { tags: { section: 'start' }, }) throw new Error(errors.VENDOR_DOWN) } if (isConsentExpired) { Sentry.captureException('Consent expired', { tags: { section: 'start' }, }) throw new Error(errors.USER_ACTION_NEEDED_OAUTH_OUTDATED) } throw new Error(errors.TERMS_VERSION_MISMATCH) } /** * Main method for gathering data * @param {string} baseUrl * @param {string} apiAuthKey * @param {string} sgeLogin * @param {string} pointId * @param {string} boBaseUrl */ async function gatherData(baseUrl, apiAuthKey, sgeLogin, pointId, boBaseUrl) { log('info', 'Querying data...') const measuresUrl = new URL( '/enedis_SGE_ConsultationMesuresDetaillees_v3/1.0', baseUrl ).href const contractUrl = new URL( '/enedis_SGE_ConsultationDonneesTechniquesContractuelles/1.0', baseUrl ).href log('info', 'Fetching BO prices') const prices = await getPrices(boBaseUrl) await getDailyData(measuresUrl, apiAuthKey, sgeLogin, pointId, prices) await getMaxPowerData(measuresUrl, apiAuthKey, sgeLogin, pointId) await getDataHalfHour(measuresUrl, apiAuthKey, sgeLogin, pointId, prices) await getOffPeakHours(contractUrl, apiAuthKey, sgeLogin, pointId) log('info', 'Querying data: done') } /** * Get hour data * @param {string} url * @param {string} apiAuthKey * @param {string} userLogin * @param {string} pointId */ async function getOffPeakHours(url, apiAuthKey, userLogin, pointId) { log('info', 'Fetching off-peak hours') const sgeHeaders = { 'Content-Type': 'text/xml;charset=UTF-8', apikey: apiAuthKey, } const { response } = await soapRequest({ url: url, headers: sgeHeaders, xml: consulterDonneesTechniquesContractuelles(pointId, userLogin, false), }).catch(err => { log('error', 'consulterDonneesTechniquesContractuelles') log('error', err) Sentry.captureException( `consulterDonneesTechniquesContractuelles: ${err}`, { tags: { section: 'getOffPeakHour' }, extra: { pointId: pointId, }, } ) return err }) catchRequestReject(response.body) const result = await xml2js.parseStringPromise(response.body, { tagNameProcessors: [parseTags], valueProcessors: [parseValue], explicitArray: false, }) try { const offPeakHours = parseUserOffPeakHours(result) log( 'debug', `Found off-peak hours : ${offPeakHours}, store them in account data` ) const accountData = await getAccount(ACCOUNT_ID) await saveAccountData(ACCOUNT_ID, { ...accountData.data, offPeakHours, }) } catch (error) { log('debug', 'Off-peak hours not found, remove them from account data') let accountData = await getAccount(ACCOUNT_ID) delete accountData.data.offPeakHours await saveAccountData(ACCOUNT_ID, { ...accountData.data, }) } } /** * Get daily data * @param {string} url * @param {string} apiAuthKey * @param {string} userLogin * @param {string} pointId */ async function getDailyData(url, apiAuthKey, userLogin, pointId, prices) { log('info', 'Fetching daily data') const sgeHeaders = { 'Content-Type': 'text/xml;charset=UTF-8', apikey: apiAuthKey, } const { response } = await soapRequest({ url: url, headers: sgeHeaders, xml: consultationMesuresDetaillees( pointId, userLogin, startDateString, endDateString, 'ENERGIE', 'EA' ), }).catch(err => { log('error', 'consultationMesuresDetaillees') log('error', err) Sentry.captureException(`consultationMesuresDetaillees: ${err}`, { tags: { section: 'getData' }, }) return err }) catchRequestReject(response.body) xml2js.parseString( response.body, { tagNameProcessors: [parseTags], valueProcessors: [parseValue], explicitArray: false, }, processDailyData(prices) ) } /** * Get Max power data * @param {string} url * @param {string} apiAuthKey * @param {string} userLogin * @param {string} pointId */ async function getMaxPowerData(url, apiAuthKey, userLogin, pointId) { log('info', 'Fetching Max Power data') const sgeHeaders = { 'Content-Type': 'text/xml;charset=UTF-8', apikey: apiAuthKey, } const { response } = await soapRequest({ url: url, headers: sgeHeaders, xml: consultationMesuresDetailleesMaxPower( pointId, userLogin, startDateString, endDateString ), }).catch(err => { log('error', 'getMaxPowerData') log('error', err) Sentry.captureException(`getMaxPowerData: ${err}`, { tags: { section: 'getMaxPowerData' }, }) return err }) catchRequestReject(response.body) xml2js.parseString( response.body, { tagNameProcessors: [parseTags], valueProcessors: [parseValue], explicitArray: false, }, processMaxPowerData() ) } /** * Get half-hour data * @param {string} url * @param {string} apiAuthKey * @param {string} userLogin * @param {string} pointId */ async function getDataHalfHour(url, apiAuthKey, userLogin, pointId, prices) { log('info', 'Fetching half-hour data') const sgeHeaders = { 'Content-Type': 'text/xml;charset=UTF-8', apikey: apiAuthKey, } // If manual execution, retrieve only 1 week otherwise retrieve 4 weeks const MAX_HISTO = manualExecution ? 1 : 4 for (let i = 0; i < MAX_HISTO; i++) { log('info', 'launch process with history') const incrementedStartDateString = moment(startHalfHourDate) .subtract(7 * i, 'day') .format('YYYY-MM-DD') const incrementedEndDateString = moment(endDate) .subtract(7 * i, 'day') .format('YYYY-MM-DD') const { response } = await soapRequest({ url: url, headers: sgeHeaders, xml: consultationMesuresDetaillees( pointId, userLogin, incrementedStartDateString, incrementedEndDateString, 'COURBE', 'PA' ), }).catch(err => { log('error', 'consultationMesuresDetaillees half-hour') log('error', err) Sentry.captureException( `consultationMesuresDetaillees half-hour: ${err}`, { tags: { section: 'getDataHalfHour' }, } ) return err }) catchRequestReject(response.body) xml2js.parseString( response.body, { tagNameProcessors: [parseTags], valueProcessors: [parseValueHalfHour], explicitArray: false, }, processHalfHourData(prices) ) } } /** * @param {Price[] | null} prices * @returns */ function processDailyData(prices) { return async (err, result) => { if (err) { log('error', err) Sentry.captureException('error while processing daily data') throw err } // Return only needed part of info log('info', `Processing ${rangeDate.day.doctype} data`) try { const data = parseSgeXmlData(result) let dailyData = await formateDataForDoctype(data) if (prices && prices.length > 0) { log('info', 'Found BO prices, applying them to enedis data') dailyData = await applyPrices(dailyData, prices) } const filterDayKeys = [...rangeDate.day.keys, 'load'] if (prices) filterDayKeys.push('price') const daysToUpdate = await hydrateAndFilter( dailyData, rangeDate.day.doctype, { keys: filterDayKeys } ) log('debug', 'Store enedis daily load data') await updateOrCreate( daysToUpdate, rangeDate.day.doctype, rangeDate.day.keys ) const { year: firstYear, month: firstMonth } = dailyData[0] log('debug', 'Aggregate enedis monthly load data') const monthlyLoads = aggregateMonthlyLoad(dailyData) log('debug', 'Filter first month aggregate if already in database') const filteredMonthlyLoads = await filterFirstMonthlyLoad( firstMonth, firstYear, monthlyLoads ) const filterMonthKeys = [...rangeDate.month.keys, 'load'] if (prices) filterMonthKeys.push('price') const monthsToUpdate = await hydrateAndFilter( filteredMonthlyLoads, rangeDate.month.doctype, { keys: filterMonthKeys } ) log('debug', 'Store aggregated enedis monthly load data') await updateOrCreate( monthsToUpdate, rangeDate.month.doctype, rangeDate.month.keys ) log('debug', 'Aggregate enedis yearly load data') const yearlyLoads = aggregateYearlyLoad(monthlyLoads) log('debug', 'Filter first year aggregate if already in database') const filteredYearlyLoads = await filterFirstYearlyLoad( firstYear, yearlyLoads ) const filterYearKeys = [...rangeDate.year.keys, 'load'] if (prices) filterYearKeys.push('price') const yearsToUpdate = await hydrateAndFilter( filteredYearlyLoads, rangeDate.year.doctype, { keys: filterYearKeys } ) log('debug', 'Store aggregated enedis yearly load data') await updateOrCreate( yearsToUpdate, rangeDate.year.doctype, rangeDate.year.keys ) } catch (e) { log('warn', `Unknown error ${e}`) } } } function processMaxPowerData() { return async (err, result) => { if (err) { log('error', err) Sentry.captureException('error while processing daily data') throw err } // Return only needed part of info log('info', `Processing ${rangeDate.maxPower.doctype} data`) try { const data = parseSgeXmlData(result) const maxPowerData = await formateDataForDoctype(data) const filterMaxPowerKeys = [...rangeDate.maxPower.keys, 'load'] const maxPowerToUpdate = await hydrateAndFilter( maxPowerData, rangeDate.maxPower.doctype, { keys: filterMaxPowerKeys } ) log('debug', 'Store Enedis max power load data') await updateOrCreate( maxPowerToUpdate, rangeDate.maxPower.doctype, rangeDate.maxPower.keys ) } catch (e) { log('warn', `Unknown error ${e}`) } } } /** * @param {Price[] | null} prices * @returns */ function processHalfHourData(prices) { return async (err, result) => { if (err) { log('error', err) Sentry.captureException('error while processing half-hour data') throw err } // Return only needed part of info log('info', `Processing ${rangeDate.minute.doctype} data`) try { const data = parseSgeXmlData(result) let minuteData = await formateDataForDoctype(data) if ( (rangeDate.minute.doctype === 'com.grandlyon.enedis.day' || rangeDate.minute.doctype === 'com.grandlyon.enedis.minute') && prices && prices.length > 0 ) { log('info', 'Found BO prices, applying them to enedis data') minuteData = await applyPrices(minuteData, prices) } const filterMinuteKeys = [...rangeDate.minute.keys, 'load'] if (prices) filterMinuteKeys.push('price') const minutesToUpdate = await hydrateAndFilter( minuteData, rangeDate.minute.doctype, { keys: filterMinuteKeys } ) log('debug', 'Store Enedis minute load data') await updateOrCreate( minutesToUpdate, rangeDate.minute.doctype, rangeDate.minute.keys ) } catch (e) { const errorMessage = `No half-hour activated. Issue: ${result.Envelope.Body.Fault.faultstring}` Sentry.captureMessage(errorMessage, { tags: { section: 'processData' }, }) log('warn', errorMessage) } } } /** * @returns {boolean} */ function isFirstStart(account) { if (account?.data?.consentId) { log('info', 'Konnector not first start') return false } log('info', 'Konnector first start') return true } /** * Check if konnector is launched in local with NO_DATA option * If so, logs result from verifyUserIdentity() and stops the konnector before getting any data * @param {User} user - The user object to log */ function exitIfDebug(user) { if (NO_DATA) { log( 'debug', `Stopping konnector before getting data, user found from verifyUserIdentity():` ) log('debug', user) process.exit() } }