Skip to content
Snippets Groups Projects
Commit a4aad4f1 authored by Rémi PAILHAREY's avatar Rémi PAILHAREY
Browse files

Merge branch '551-erreur-du-service-fluidprices' into 'dev'

fix(fluidprices): removed parallelization

See merge request !1037
parents 396bd20c c0f665be
No related branches found
No related tags found
2 merge requests!10622.7 Release,!1037fix(fluidprices): removed parallelization
......@@ -5,7 +5,7 @@ export interface FluidPrice {
price: number
startDate: string
endDate: string
UpdatedAt?: string
UpdatedAt: string
_id: string
_rev?: string
_type?: string
......
......@@ -78,6 +78,7 @@ export default class FluidPricesService {
price: config.coefficient,
startDate: config.startDate,
endDate: '',
UpdatedAt: '2000-01-01T00:00:00Z',
})
})
......
......@@ -255,73 +255,41 @@ export default class QueryRunner {
}
private getRelevantDoctype(fluidType: FluidType, timeStep: TimeStep) {
let doctype = ''
switch (fluidType) {
case FluidType.ELECTRICITY:
{
switch (timeStep) {
case TimeStep.HALF_AN_HOUR:
doctype = ENEDIS_MINUTE_DOCTYPE
break
case TimeStep.WEEK:
case TimeStep.DAY:
doctype = ENEDIS_DAY_DOCTYPE
break
case TimeStep.MONTH:
doctype = ENEDIS_MONTH_DOCTYPE
break
case TimeStep.YEAR:
doctype = ENEDIS_YEAR_DOCTYPE
break
default:
doctype = ENEDIS_DAY_DOCTYPE
}
}
break
case FluidType.WATER:
{
switch (timeStep) {
case TimeStep.WEEK:
case TimeStep.DAY:
doctype = EGL_DAY_DOCTYPE
break
case TimeStep.MONTH:
doctype = EGL_MONTH_DOCTYPE
break
case TimeStep.YEAR:
doctype = EGL_YEAR_DOCTYPE
break
default:
doctype = EGL_DAY_DOCTYPE
}
}
break
case FluidType.GAS:
{
switch (timeStep) {
case TimeStep.WEEK:
case TimeStep.DAY:
doctype = GRDF_DAY_DOCTYPE
break
case TimeStep.MONTH:
doctype = GRDF_MONTH_DOCTYPE
break
case TimeStep.YEAR:
doctype = GRDF_YEAR_DOCTYPE
break
default:
doctype = GRDF_DAY_DOCTYPE
}
}
break
default:
break
interface DoctypesMappings {
[fluidType: number]: {
[timeStep: number]: string
default: string
}
}
const doctypeMappings: DoctypesMappings = {
[FluidType.ELECTRICITY]: {
[TimeStep.HALF_AN_HOUR]: ENEDIS_MINUTE_DOCTYPE,
[TimeStep.WEEK]: ENEDIS_DAY_DOCTYPE,
[TimeStep.DAY]: ENEDIS_DAY_DOCTYPE,
[TimeStep.MONTH]: ENEDIS_MONTH_DOCTYPE,
[TimeStep.YEAR]: ENEDIS_YEAR_DOCTYPE,
default: ENEDIS_DAY_DOCTYPE,
},
[FluidType.WATER]: {
[TimeStep.WEEK]: EGL_DAY_DOCTYPE,
[TimeStep.DAY]: EGL_DAY_DOCTYPE,
[TimeStep.MONTH]: EGL_MONTH_DOCTYPE,
[TimeStep.YEAR]: EGL_YEAR_DOCTYPE,
default: EGL_DAY_DOCTYPE,
},
[FluidType.GAS]: {
[TimeStep.WEEK]: GRDF_DAY_DOCTYPE,
[TimeStep.DAY]: GRDF_DAY_DOCTYPE,
[TimeStep.MONTH]: GRDF_MONTH_DOCTYPE,
[TimeStep.YEAR]: GRDF_YEAR_DOCTYPE,
default: GRDF_DAY_DOCTYPE,
},
}
return doctype
return (
doctypeMappings[fluidType]?.[timeStep] ||
doctypeMappings[fluidType]?.['default'] ||
'default'
)
}
public async fetchFluidData(
......
......@@ -19,10 +19,6 @@ import { runService } from './service'
const logStack = logger.namespace('fluidPrices')
interface PricesProps {
client: Client
}
const getRemotePricesByFluid = async (
client: Client,
fluidType: FluidType
......@@ -37,6 +33,18 @@ const getRemotePricesByFluid = async (
return prices
}
/**
* If a price has been updated, set the oldest startDate of the edited price so we can redo aggregation
*/
function updateFirstEditedPrice(
firstEditedPrice: string | null,
remotePrice: FluidPrice
): string {
return firstEditedPrice === null || firstEditedPrice >= remotePrice.startDate
? remotePrice.startDate
: firstEditedPrice
}
/**
* Synchro the remote prices with database and returns a date where we have to relaunch aggregation if a price has been edited in backoffice
* @returns {string | null} the oldest startDate
......@@ -46,62 +54,39 @@ const synchroPricesToUpdate = async (
fluidType: FluidType
): Promise<string | null> => {
const fps = new FluidPricesService(client)
try {
const remotePrices = await getRemotePricesByFluid(client, fluidType)
let firstEditedPrice: string | null = null
await Promise.all(
remotePrices.map(async remotePrice => {
try {
for (const remotePrice of remotePrices) {
const existingPrice = await fps.checkIfPriceExists(remotePrice)
if (!existingPrice) {
logStack('debug', `Price doesn't exist in db, creating a new price`)
// If a price has been updated, set the oldest startDate of the edited price so we can redo aggregation
if (
firstEditedPrice === null ||
firstEditedPrice >= remotePrice.startDate
) {
firstEditedPrice = remotePrice.startDate
}
firstEditedPrice = updateFirstEditedPrice(firstEditedPrice, remotePrice)
await fps.createPrice(remotePrice)
return
continue
}
if (!existingPrice.UpdatedAt && remotePrice.UpdatedAt) {
// updatedAt key doesn't exist in db
await fps.updatePrice(existingPrice, {
UpdatedAt: remotePrice.UpdatedAt,
})
return
} else if (
!existingPrice.UpdatedAt ||
!remotePrice.UpdatedAt ||
existingPrice.UpdatedAt >= remotePrice.UpdatedAt
) {
if (existingPrice.UpdatedAt >= remotePrice.UpdatedAt) {
logStack('debug', `Price up to date`)
return
continue
}
logStack('debug', `Price exists in db but not up to date, updating it`)
// If a price has been updated, set the oldest startDate of the edited price so we can redo aggregation
if (
firstEditedPrice === null ||
firstEditedPrice >= remotePrice.startDate
) {
firstEditedPrice = remotePrice.startDate
}
firstEditedPrice = updateFirstEditedPrice(firstEditedPrice, remotePrice)
// update this price in db
await fps.updatePrice(existingPrice, {
price: remotePrice.price,
UpdatedAt: remotePrice.UpdatedAt,
startDate: remotePrice.startDate,
endDate: remotePrice.endDate,
})
}
return firstEditedPrice
} catch (error) {
logStack('error', `Error: ${error}`)
Sentry.captureException(error)
return null
}
})
)
return firstEditedPrice
}
const price = (item: DataloadEntity): number => {
......@@ -149,45 +134,45 @@ const aggregatePrices = async (
today: DateTime,
fluidType: FluidType
) => {
const tsa = [TimeStep.MONTH, TimeStep.YEAR]
const timeSteps = [TimeStep.MONTH, TimeStep.YEAR]
logStack(
'debug',
`Aggregation started for fluid: ${fluidType}, from ${firstDate} `
)
await Promise.all(
tsa.map(async ts => {
let date: DateTime = DateTime.local()
Object.assign(date, firstDate)
for (const timeStep of timeSteps) {
let aggregationDate = DateTime.fromObject(firstDate)
try {
do {
const tp = await getTimePeriod(ts, date)
const timePeriod = await getTimePeriod(timeStep, aggregationDate)
// Get doc for aggregation
const data = await qr.fetchFluidRawDoctype(
tp,
const dayDocuments = await qr.fetchFluidRawDoctype(
timePeriod,
TimeStep.DAY,
fluidType
)
// Get doc to update
const docToUpdate = await qr.fetchFluidRawDoctype(tp, ts, fluidType)
const docToUpdate = await qr.fetchFluidRawDoctype(
timePeriod,
timeStep,
fluidType
)
if (docToUpdate?.data && data?.data) {
docToUpdate.data[0].price = data.data.map(price).reduce(sum)
if (docToUpdate?.data && dayDocuments?.data) {
docToUpdate.data[0].price = dayDocuments.data.map(price).reduce(sum)
}
// Save updated docs
await cdm.saveDocs(docToUpdate?.data)
// Update date according to timestep
if (ts === TimeStep.YEAR) {
date = date.plus({ year: 1 }).startOf('month')
if (timeStep === TimeStep.YEAR) {
aggregationDate = aggregationDate.plus({ year: 1 }).startOf('month')
} else {
date = date.plus({ month: 1 }).startOf('month')
aggregationDate = aggregationDate.plus({ month: 1 }).startOf('month')
}
} while (date < today)
} while (aggregationDate < today)
} catch (error) {
logStack('info', `Error : ${error}`)
Sentry.captureException(error)
}
})
)
}
logStack('debug', `Aggregation done`)
}
......@@ -206,7 +191,7 @@ const getDoctypeTypeByFluid = (fluidType: FluidType): string => {
throw new Error()
}
const getTimeSetByFluid = (fluidType: FluidType): TimeStep[] => {
const getTimeStepsByFluid = (fluidType: FluidType): TimeStep[] => {
if (fluidType === FluidType.ELECTRICITY) {
return [TimeStep.DAY, TimeStep.HALF_AN_HOUR]
}
......@@ -228,21 +213,26 @@ const applyPrices = async (client: Client, fluidType: FluidType) => {
const firstEditedPriceDate = await synchroPricesToUpdate(client, fluidType)
const firstDataDate = await cdm.fetchAllFirstDateData([fluidType])
const prices = await fluidsPricesService.getAllPrices()
// Prices data exist
if (prices.length > 0) {
if (prices.length === 0) {
logStack('info', 'No fluidesPrices data')
return
}
logStack('debug', 'fluidPrices data found')
const firstMinuteData = await cdm.getFirstDataDateFromDoctypeWithPrice(
getDoctypeTypeByFluid(fluidType)
)
// const firstDoctypeData = await cdm.getFirsDataDateFromDoctype()
// If there is data, update hourly data and daily data
if (
firstDataDate?.[0] &&
(firstMinuteData || firstEditedPriceDate !== null)
!firstDataDate?.[0] ||
(!firstMinuteData && firstEditedPriceDate === null)
) {
logStack('info', `No data found for fluid ${fluidType}`)
return
}
const today = DateTime.now()
const tsa = getTimeSetByFluid(fluidType)
const timeSteps = getTimeStepsByFluid(fluidType)
let firstDate: DateTime
if (firstMinuteData && firstEditedPriceDate) {
......@@ -284,23 +274,17 @@ const applyPrices = async (client: Client, fluidType: FluidType) => {
}
// Hourly and daily prices
await Promise.all(
tsa.map(async timeStep => {
for (const timeStep of timeSteps) {
let date: DateTime = DateTime.local().setZone('utc', {
keepLocalTime: true,
})
Object.assign(date, firstDate)
try {
do {
// Get price
const priceData = await fluidsPricesService.getPrices(
fluidType,
date
)
const tp = await getTimePeriod(timeStep, date)
// Get doc to update
const priceData = await fluidsPricesService.getPrices(fluidType, date)
const timePeriod = await getTimePeriod(timeStep, date)
const data = await qr.fetchFluidRawDoctype(
tp,
timePeriod,
timeStep,
fluidType
)
......@@ -312,7 +296,6 @@ const applyPrices = async (client: Client, fluidType: FluidType) => {
data?.data.forEach((element: DataloadEntity) => {
element.price = element.load * priceData.price
})
// Save updated docs
await cdm.saveDocs(data.data)
}
// Update date
......@@ -326,26 +309,21 @@ const applyPrices = async (client: Client, fluidType: FluidType) => {
logStack('error', `ERROR : ${error} `)
Sentry.captureException(error)
}
})
)
}
// Call aggregation method
await aggregatePrices(qr, cdm, firstDate, today, fluidType)
} else logStack('info', `No data found for fluid ${fluidType}`)
} else logStack('info', 'No fluidesPrices data')
}
const processPrices = async ({ client }: PricesProps) => {
const processPrices = async ({ client }: { client: Client }) => {
logStack('info', `Processing electricity data...`)
const elec = applyPrices(client, FluidType.ELECTRICITY)
await applyPrices(client, FluidType.ELECTRICITY)
logStack('info', `Electricity data done`)
logStack('info', `Processing gas data...`)
const gas = applyPrices(client, FluidType.GAS)
await applyPrices(client, FluidType.GAS)
logStack('info', `Gas data done`)
logStack('info', `Processing water data...`)
const water = applyPrices(client, FluidType.WATER)
await applyPrices(client, FluidType.WATER)
logStack('info', `Water data done`)
await Promise.all([elec, gas, water])
logStack('info', `processPrices done`)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment