diff --git a/src/models/fluidPrice.model.ts b/src/models/fluidPrice.model.ts index c38feff398246475c8653a8d69bdbb69d41cd4b7..68d251432d26c058cd3528c6c04116429a89a885 100644 --- a/src/models/fluidPrice.model.ts +++ b/src/models/fluidPrice.model.ts @@ -5,7 +5,7 @@ export interface FluidPrice { price: number startDate: string endDate: string - UpdatedAt?: string + UpdatedAt: string _id: string _rev?: string _type?: string diff --git a/src/services/fluidsPrices.service.ts b/src/services/fluidsPrices.service.ts index 6eb9945c490cec11c5684dab446a28d13c43d6ee..5a16cf3ae4ac94748ffb5130c05ba5a73f5a531f 100644 --- a/src/services/fluidsPrices.service.ts +++ b/src/services/fluidsPrices.service.ts @@ -78,6 +78,7 @@ export default class FluidPricesService { price: config.coefficient, startDate: config.startDate, endDate: '', + UpdatedAt: '2000-01-01T00:00:00Z', }) }) diff --git a/src/services/queryRunner.service.ts b/src/services/queryRunner.service.ts index 3fffee26dfc4292c518396f10702e59f9fa63443..30dcbdf0c1e0b0e01efeb83740ccb15bb897f0f0 100644 --- a/src/services/queryRunner.service.ts +++ b/src/services/queryRunner.service.ts @@ -255,73 +255,41 @@ export default class QueryRunner { } private getRelevantDoctype(fluidType: FluidType, timeStep: TimeStep) { - let doctype = '' - switch (fluidType) { - case FluidType.ELECTRICITY: - { - switch (timeStep) { - case TimeStep.HALF_AN_HOUR: - doctype = ENEDIS_MINUTE_DOCTYPE - break - case TimeStep.WEEK: - case TimeStep.DAY: - doctype = ENEDIS_DAY_DOCTYPE - break - case TimeStep.MONTH: - doctype = ENEDIS_MONTH_DOCTYPE - break - case TimeStep.YEAR: - doctype = ENEDIS_YEAR_DOCTYPE - break - default: - doctype = ENEDIS_DAY_DOCTYPE - } - } - break - - case FluidType.WATER: - { - switch (timeStep) { - case TimeStep.WEEK: - case TimeStep.DAY: - doctype = EGL_DAY_DOCTYPE - break - case TimeStep.MONTH: - doctype = EGL_MONTH_DOCTYPE - break - case TimeStep.YEAR: - doctype = EGL_YEAR_DOCTYPE - break - default: - doctype = EGL_DAY_DOCTYPE - } - } - break - - case FluidType.GAS: - { - switch (timeStep) { - case TimeStep.WEEK: - case TimeStep.DAY: - doctype = GRDF_DAY_DOCTYPE - break - case TimeStep.MONTH: - doctype = GRDF_MONTH_DOCTYPE - break - case TimeStep.YEAR: - doctype = GRDF_YEAR_DOCTYPE - break - default: - doctype = GRDF_DAY_DOCTYPE - } - } - break - - default: - break + interface DoctypesMappings { + [fluidType: number]: { + [timeStep: number]: string + default: string + } } - - return doctype + const doctypeMappings: DoctypesMappings = { + [FluidType.ELECTRICITY]: { + [TimeStep.HALF_AN_HOUR]: ENEDIS_MINUTE_DOCTYPE, + [TimeStep.WEEK]: ENEDIS_DAY_DOCTYPE, + [TimeStep.DAY]: ENEDIS_DAY_DOCTYPE, + [TimeStep.MONTH]: ENEDIS_MONTH_DOCTYPE, + [TimeStep.YEAR]: ENEDIS_YEAR_DOCTYPE, + default: ENEDIS_DAY_DOCTYPE, + }, + [FluidType.WATER]: { + [TimeStep.WEEK]: EGL_DAY_DOCTYPE, + [TimeStep.DAY]: EGL_DAY_DOCTYPE, + [TimeStep.MONTH]: EGL_MONTH_DOCTYPE, + [TimeStep.YEAR]: EGL_YEAR_DOCTYPE, + default: EGL_DAY_DOCTYPE, + }, + [FluidType.GAS]: { + [TimeStep.WEEK]: GRDF_DAY_DOCTYPE, + [TimeStep.DAY]: GRDF_DAY_DOCTYPE, + [TimeStep.MONTH]: GRDF_MONTH_DOCTYPE, + [TimeStep.YEAR]: GRDF_YEAR_DOCTYPE, + default: GRDF_DAY_DOCTYPE, + }, + } + return ( + doctypeMappings[fluidType]?.[timeStep] || + doctypeMappings[fluidType]?.['default'] || + 'default' + ) } public async fetchFluidData( diff --git a/src/targets/services/fluidsPrices.ts b/src/targets/services/fluidsPrices.ts index 8e2706850f6d7aa6f841abfd28c3250cf4591cb2..cd71bf29c611e5991c0b373624ecf8aa285252a7 100644 --- a/src/targets/services/fluidsPrices.ts +++ b/src/targets/services/fluidsPrices.ts @@ -19,10 +19,6 @@ import { runService } from './service' const logStack = logger.namespace('fluidPrices') -interface PricesProps { - client: Client -} - const getRemotePricesByFluid = async ( client: Client, fluidType: FluidType @@ -37,6 +33,18 @@ const getRemotePricesByFluid = async ( return prices } +/** + * If a price has been updated, set the oldest startDate of the edited price so we can redo aggregation + */ +function updateFirstEditedPrice( + firstEditedPrice: string | null, + remotePrice: FluidPrice +): string { + return firstEditedPrice === null || firstEditedPrice >= remotePrice.startDate + ? remotePrice.startDate + : firstEditedPrice +} + /** * Synchro the remote prices with database and returns a date where we have to relaunch aggregation if a price has been edited in backoffice * @returns {string | null} the oldest startDate @@ -46,62 +54,39 @@ const synchroPricesToUpdate = async ( fluidType: FluidType ): Promise<string | null> => { const fps = new FluidPricesService(client) - const remotePrices = await getRemotePricesByFluid(client, fluidType) - let firstEditedPrice: string | null = null - await Promise.all( - remotePrices.map(async remotePrice => { - try { - const existingPrice = await fps.checkIfPriceExists(remotePrice) - if (!existingPrice) { - logStack('debug', `Price doesn't exist in db, creating a new price`) - // If a price has been updated, set the oldest startDate of the edited price so we can redo aggregation - if ( - firstEditedPrice === null || - firstEditedPrice >= remotePrice.startDate - ) { - firstEditedPrice = remotePrice.startDate - } - await fps.createPrice(remotePrice) - return - } - - if (!existingPrice.UpdatedAt && remotePrice.UpdatedAt) { - // updatedAt key doesn't exist in db - await fps.updatePrice(existingPrice, { - UpdatedAt: remotePrice.UpdatedAt, - }) - return - } else if ( - !existingPrice.UpdatedAt || - !remotePrice.UpdatedAt || - existingPrice.UpdatedAt >= remotePrice.UpdatedAt - ) { - logStack('debug', `Price up to date`) - return - } - logStack('debug', `Price exists in db but not up to date, updating it`) - // If a price has been updated, set the oldest startDate of the edited price so we can redo aggregation - if ( - firstEditedPrice === null || - firstEditedPrice >= remotePrice.startDate - ) { - firstEditedPrice = remotePrice.startDate - } + try { + const remotePrices = await getRemotePricesByFluid(client, fluidType) + let firstEditedPrice: string | null = null + for (const remotePrice of remotePrices) { + const existingPrice = await fps.checkIfPriceExists(remotePrice) + if (!existingPrice) { + logStack('debug', `Price doesn't exist in db, creating a new price`) + firstEditedPrice = updateFirstEditedPrice(firstEditedPrice, remotePrice) + await fps.createPrice(remotePrice) + continue + } - // update this price in db - await fps.updatePrice(existingPrice, { - price: remotePrice.price, - UpdatedAt: remotePrice.UpdatedAt, - startDate: remotePrice.startDate, - endDate: remotePrice.endDate, - }) - } catch (error) { - logStack('error', `Error: ${error}`) - Sentry.captureException(error) + if (existingPrice.UpdatedAt >= remotePrice.UpdatedAt) { + logStack('debug', `Price up to date`) + continue } - }) - ) - return firstEditedPrice + logStack('debug', `Price exists in db but not up to date, updating it`) + // If a price has been updated, set the oldest startDate of the edited price so we can redo aggregation + firstEditedPrice = updateFirstEditedPrice(firstEditedPrice, remotePrice) + + await fps.updatePrice(existingPrice, { + price: remotePrice.price, + UpdatedAt: remotePrice.UpdatedAt, + startDate: remotePrice.startDate, + endDate: remotePrice.endDate, + }) + } + return firstEditedPrice + } catch (error) { + logStack('error', `Error: ${error}`) + Sentry.captureException(error) + return null + } } const price = (item: DataloadEntity): number => { @@ -149,45 +134,45 @@ const aggregatePrices = async ( today: DateTime, fluidType: FluidType ) => { - const tsa = [TimeStep.MONTH, TimeStep.YEAR] + const timeSteps = [TimeStep.MONTH, TimeStep.YEAR] logStack( 'debug', `Aggregation started for fluid: ${fluidType}, from ${firstDate} ` ) - await Promise.all( - tsa.map(async ts => { - let date: DateTime = DateTime.local() - Object.assign(date, firstDate) - try { - do { - const tp = await getTimePeriod(ts, date) - // Get doc for aggregation - const data = await qr.fetchFluidRawDoctype( - tp, - TimeStep.DAY, - fluidType - ) - // Get doc to update - const docToUpdate = await qr.fetchFluidRawDoctype(tp, ts, fluidType) + for (const timeStep of timeSteps) { + let aggregationDate = DateTime.fromObject(firstDate) + try { + do { + const timePeriod = await getTimePeriod(timeStep, aggregationDate) + // Get doc for aggregation + const dayDocuments = await qr.fetchFluidRawDoctype( + timePeriod, + TimeStep.DAY, + fluidType + ) + const docToUpdate = await qr.fetchFluidRawDoctype( + timePeriod, + timeStep, + fluidType + ) + + if (docToUpdate?.data && dayDocuments?.data) { + docToUpdate.data[0].price = dayDocuments.data.map(price).reduce(sum) + } + await cdm.saveDocs(docToUpdate?.data) + // Update date according to timestep + if (timeStep === TimeStep.YEAR) { + aggregationDate = aggregationDate.plus({ year: 1 }).startOf('month') + } else { + aggregationDate = aggregationDate.plus({ month: 1 }).startOf('month') + } + } while (aggregationDate < today) + } catch (error) { + logStack('info', `Error : ${error}`) + Sentry.captureException(error) + } + } - if (docToUpdate?.data && data?.data) { - docToUpdate.data[0].price = data.data.map(price).reduce(sum) - } - // Save updated docs - await cdm.saveDocs(docToUpdate?.data) - // Update date according to timestep - if (ts === TimeStep.YEAR) { - date = date.plus({ year: 1 }).startOf('month') - } else { - date = date.plus({ month: 1 }).startOf('month') - } - } while (date < today) - } catch (error) { - logStack('info', `Error : ${error}`) - Sentry.captureException(error) - } - }) - ) logStack('debug', `Aggregation done`) } @@ -206,7 +191,7 @@ const getDoctypeTypeByFluid = (fluidType: FluidType): string => { throw new Error() } -const getTimeSetByFluid = (fluidType: FluidType): TimeStep[] => { +const getTimeStepsByFluid = (fluidType: FluidType): TimeStep[] => { if (fluidType === FluidType.ELECTRICITY) { return [TimeStep.DAY, TimeStep.HALF_AN_HOUR] } @@ -228,124 +213,117 @@ const applyPrices = async (client: Client, fluidType: FluidType) => { const firstEditedPriceDate = await synchroPricesToUpdate(client, fluidType) const firstDataDate = await cdm.fetchAllFirstDateData([fluidType]) const prices = await fluidsPricesService.getAllPrices() - // Prices data exist - if (prices.length > 0) { - logStack('debug', 'fluidPrices data found') - const firstMinuteData = await cdm.getFirstDataDateFromDoctypeWithPrice( - getDoctypeTypeByFluid(fluidType) - ) - // const firstDoctypeData = await cdm.getFirsDataDateFromDoctype() - // If there is data, update hourly data and daily data - if ( - firstDataDate?.[0] && - (firstMinuteData || firstEditedPriceDate !== null) - ) { - const today = DateTime.now() - const tsa = getTimeSetByFluid(fluidType) - let firstDate: DateTime + if (prices.length === 0) { + logStack('info', 'No fluidesPrices data') + return + } + logStack('debug', 'fluidPrices data found') + const firstMinuteData = await cdm.getFirstDataDateFromDoctypeWithPrice( + getDoctypeTypeByFluid(fluidType) + ) - if (firstMinuteData && firstEditedPriceDate) { - // If there is first data without price and a price edited, set the smallest date - const firstMinuteDataDate = DateTime.fromObject({ - year: firstMinuteData.year, - month: firstMinuteData.month, - day: firstMinuteData.day, - }).setZone('utc', { - keepLocalTime: true, - }) - const formattedFirstEditedPrice = DateTime.fromISO( - firstEditedPriceDate - ).setZone('utc', { - keepLocalTime: true, - }) - // we want to exclude the period with no data if the edited date is smaller than the first data date - firstDate = DateTime.min( - DateTime.max(formattedFirstEditedPrice, firstDataDate[0]), - firstMinuteDataDate - ) - } else if (firstMinuteData) { - firstDate = DateTime.fromObject({ - year: firstMinuteData.year, - month: firstMinuteData.month, - day: firstMinuteData.day, - }).setZone('utc', { - keepLocalTime: true, - }) - } else if (firstEditedPriceDate) { - firstDate = DateTime.max( - DateTime.fromISO(firstEditedPriceDate).setZone('utc', { - keepLocalTime: true, - }), - firstDataDate[0] + // If there is data, update hourly data and daily data + if ( + !firstDataDate?.[0] || + (!firstMinuteData && firstEditedPriceDate === null) + ) { + logStack('info', `No data found for fluid ${fluidType}`) + return + } + const today = DateTime.now() + const timeSteps = getTimeStepsByFluid(fluidType) + let firstDate: DateTime + + if (firstMinuteData && firstEditedPriceDate) { + // If there is first data without price and a price edited, set the smallest date + const firstMinuteDataDate = DateTime.fromObject({ + year: firstMinuteData.year, + month: firstMinuteData.month, + day: firstMinuteData.day, + }).setZone('utc', { + keepLocalTime: true, + }) + const formattedFirstEditedPrice = DateTime.fromISO( + firstEditedPriceDate + ).setZone('utc', { + keepLocalTime: true, + }) + // we want to exclude the period with no data if the edited date is smaller than the first data date + firstDate = DateTime.min( + DateTime.max(formattedFirstEditedPrice, firstDataDate[0]), + firstMinuteDataDate + ) + } else if (firstMinuteData) { + firstDate = DateTime.fromObject({ + year: firstMinuteData.year, + month: firstMinuteData.month, + day: firstMinuteData.day, + }).setZone('utc', { + keepLocalTime: true, + }) + } else if (firstEditedPriceDate) { + firstDate = DateTime.max( + DateTime.fromISO(firstEditedPriceDate).setZone('utc', { + keepLocalTime: true, + }), + firstDataDate[0] + ) + } else { + firstDate = today + } + + // Hourly and daily prices + for (const timeStep of timeSteps) { + let date: DateTime = DateTime.local().setZone('utc', { + keepLocalTime: true, + }) + Object.assign(date, firstDate) + try { + do { + const priceData = await fluidsPricesService.getPrices(fluidType, date) + const timePeriod = await getTimePeriod(timeStep, date) + const data = await qr.fetchFluidRawDoctype( + timePeriod, + timeStep, + fluidType ) - } else { - firstDate = today - } - // Hourly and daily prices - await Promise.all( - tsa.map(async timeStep => { - let date: DateTime = DateTime.local().setZone('utc', { - keepLocalTime: true, + // If lastItem has a price, skip this day (in order to save perf) + const lastItem = data?.data && data.data[data.data.length - 1] + if (lastItem && priceData) { + // if a price has been updated in backoffice re-calculates all price from the firstEditedPriceDate + data?.data.forEach((element: DataloadEntity) => { + element.price = element.load * priceData.price }) - Object.assign(date, firstDate) - try { - do { - // Get price - const priceData = await fluidsPricesService.getPrices( - fluidType, - date - ) - const tp = await getTimePeriod(timeStep, date) - // Get doc to update - const data = await qr.fetchFluidRawDoctype( - tp, - timeStep, - fluidType - ) - - // If lastItem has a price, skip this day (in order to save perf) - const lastItem = data?.data && data.data[data.data.length - 1] - if (lastItem && priceData) { - // if a price has been updated in backoffice re-calculates all price from the firstEditedPriceDate - data?.data.forEach((element: DataloadEntity) => { - element.price = element.load * priceData.price - }) - // Save updated docs - await cdm.saveDocs(data.data) - } - // Update date - if (timeStep === TimeStep.HALF_AN_HOUR) { - date = date.plus({ days: 1 }) - } else { - date = date.plus({ month: 1 }).startOf('month') - } - } while (date < today) - } catch (error) { - logStack('error', `ERROR : ${error} `) - Sentry.captureException(error) - } - }) - ) + await cdm.saveDocs(data.data) + } + // Update date + if (timeStep === TimeStep.HALF_AN_HOUR) { + date = date.plus({ days: 1 }) + } else { + date = date.plus({ month: 1 }).startOf('month') + } + } while (date < today) + } catch (error) { + logStack('error', `ERROR : ${error} `) + Sentry.captureException(error) + } + } - // Call aggregation method - await aggregatePrices(qr, cdm, firstDate, today, fluidType) - } else logStack('info', `No data found for fluid ${fluidType}`) - } else logStack('info', 'No fluidesPrices data') + await aggregatePrices(qr, cdm, firstDate, today, fluidType) } -const processPrices = async ({ client }: PricesProps) => { +const processPrices = async ({ client }: { client: Client }) => { logStack('info', `Processing electricity data...`) - const elec = applyPrices(client, FluidType.ELECTRICITY) + await applyPrices(client, FluidType.ELECTRICITY) logStack('info', `Electricity data done`) logStack('info', `Processing gas data...`) - const gas = applyPrices(client, FluidType.GAS) + await applyPrices(client, FluidType.GAS) logStack('info', `Gas data done`) logStack('info', `Processing water data...`) - const water = applyPrices(client, FluidType.WATER) + await applyPrices(client, FluidType.WATER) logStack('info', `Water data done`) - await Promise.all([elec, gas, water]) logStack('info', `processPrices done`) }