Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions Control/lib/kafka/adapters/odc/odcDeviceEventAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
*/

const { OdcDeviceInfoAdapter } = require('../../../adapters/OdcDeviceInfoAdapter.js');
const { SourceEventTypes } = require('../../enums/SourceEventsTypes.enum.js');

/**
* @typedef {Object} deviceStateChanged
* @typedef {OdcDeviceInfo} deviceStateChanged
*
*
* @example
* @example of RUNNING as received in the payload of the `odc.deviceStateChanged` event on integrated_service.odc topic
* {
* "partitionId": "2uvML7dXYm7",
* "ddsSessionId": "64a39ff4-ee70-4a03-b2c4-3ed41c1bd5a2",
Expand All @@ -31,6 +32,21 @@ const { OdcDeviceInfoAdapter } = require('../../../adapters/OdcDeviceInfoAdapter
* "expendable": false,
* "rmsjobid": "6606"
* }
*
* @xample of ERROR as received in the payload of the `odc.deviceStateChanged` event on integrated_service.odc topic
* {
* "partitionId": "2zqJdVsaHwL",
* "ddsSessionId": "2ab25eb0-2de1-49cc-852c-8b0342096229",
* "ddsSessionStatus": "RUNNING",
* "state": "ERROR",
* "ecsState": "ERROR",
* "taskId": "807896542787881827",
* "path": "main/RecoGroupMi100/RecoCollectionMi100_0/pvertex-track-matching_t1_reco1_0",
* "ignored": true,
* "host": "epn323.internal",
* "expendable": false,
* "rmsjobid": "unknown"
* }
*/

/**
Expand All @@ -46,6 +62,7 @@ exports.odcDeviceEventAdapter = (generalIntegratedServiceEvent) => {
const odcDevice = OdcDeviceInfoAdapter.toEntity(payload);

return {
source: SourceEventTypes.ODC,
environmentId,
error,
timestamp,
Expand Down
2 changes: 2 additions & 0 deletions Control/lib/kafka/adapters/taskEventAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
const { getTaskShortName } = require('../../adapters/task/getTaskShortName.js');
const { TaskState } = require('../../common/taskState.enum.js');
const { TaskStatus } = require('../../common/taskStatus.enum.js');
const { SourceEventTypes } = require('../enums/SourceEventsTypes.enum.js');

/**
* Adapter for event messages received on run topic
Expand All @@ -36,6 +37,7 @@ exports.taskEventAdapter = ({ taskEvent }) => {
} = taskEvent;

return {
source: SourceEventTypes.ECS,
id: taskid,
taskId: taskid,
name: getTaskShortName(name),
Expand Down
20 changes: 20 additions & 0 deletions Control/lib/kafka/enums/SourceEventsTypes.enum.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

/**
* Enum for the different types of task events, used to distinguish the source of the event in the cache and when emitting it
*/
exports.SourceEventTypes = Object.freeze({
ECS: 'ECS',
ODC: 'ODC'
});
1 change: 1 addition & 0 deletions Control/lib/services/Environment.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class EnvironmentService {
environmentInfo.events = [...cachedEnvironment.events];
environmentInfo.isDeploying = cachedEnvironment.isDeploying;
environmentInfo.deploymentError = cachedEnvironment.deploymentError;
environmentInfo.firstTaskInError = cachedEnvironment.firstTaskInError;
}
return environmentInfo;
}
Expand Down
18 changes: 14 additions & 4 deletions Control/lib/services/environment/EnvironmentCache.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,27 @@ class EnvironmentCacheService {
* * Heartbeat calls (GetEnvironment/GetEnvironments) - which will NOT contain `isDeploying` and `deploymentError` properties
* * Cache caught events - which should contain `isDeploying` and `deploymentError` properties
* @param {string} id - the id of the environment to be updated
* @param {EnvironmentInfo} environment - the new environment information to be set
* @param {Partial<EnvironmentInfo>} environment - the new environment information to be set
* @returns {void}
*/
addOrUpdateEnvironment(environment, shouldBroadcast = false) {
const { id } = environment;
if (this._environments.has(id)) {
const cachedEnvironment = this._environments.get(id);
const { events = [] } = cachedEnvironment;
const {isDeploying, deploymentError } = cachedEnvironment;
/**
* @param {EnvironmentInfo} cachedEnvironment - the environment information currently stored in cache for the environment with the given id
* @param {boolean} cachedEnvironment.isDeploying - the information if the environment is being deployed
* @param {string} cachedEnvironment.deploymentError - the error message if the environment deployment failed
* @param {TaskEvent|OdcDeviceInfoEvent} cachedEnvironment.firstTaskInError - the first task event in error for the environment, which can be either a FLP task or an ODC device state change
*/
const { isDeploying, deploymentError, firstTaskInError } = cachedEnvironment;
const updatedEnvironment = Object.assign({}, cachedEnvironment, environment);
updatedEnvironment.events = [...events];
updatedEnvironment.isDeploying = isDeploying;
updatedEnvironment.deploymentError = deploymentError;
updatedEnvironment.firstTaskInError = firstTaskInError;

this._environments.set(id, updatedEnvironment);
} else {
this._environments.set(id, { ...environment, events: environment.events ?? [] });
Expand Down Expand Up @@ -195,10 +203,11 @@ class EnvironmentCacheService {
*/
_handleFirstTaskInError(environmentId, event) {
if (
(event.state === TaskState.ERROR || event.state === TaskState.ERROR_CRITICAL)
(event.state === TaskState.ERROR_CRITICAL)
&& this._environments.has(environmentId)
&& !this._environments.get(environmentId).firstTaskInError
) {
this._logger.warnMessage(`Environment ${environmentId} has a first task in critical error: ${event.id}`);
const environment = JSON.parse(JSON.stringify(this._environments.get(environmentId)));
environment.firstTaskInError = event;
this._environments.set(environmentId, environment);
Expand Down Expand Up @@ -236,6 +245,7 @@ class EnvironmentCacheService {

if (
state === EnvironmentState.CONFIGURED &&
transition?.name === EnvironmentTransitionType.CONFIGURE &&
transition?.status === EcsOperationAndStepStatus.DONE_OK
) {
// Once the environment is configured and ongoing transition is done, we can set the isDeploying to false
Expand All @@ -253,9 +263,9 @@ class EnvironmentCacheService {
this.addOrUpdateEnvironment(cachedEnvironment, false);

if (
state === EnvironmentState.DONE &&
transition?.name === EnvironmentTransitionType.DESTROY &&
transition?.status === EcsOperationAndStepStatus.DONE_OK &&
state === EnvironmentState.DONE &&
!cachedEnvironment.deploymentError
) {
// That is, if the environment successfully ended the DESTROY transition
Expand Down
1 change: 1 addition & 0 deletions Control/lib/typedefs/TaskEvent.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* TaskEvent type definition as parsed following the received message from the ECS Kafka task topic
* The parsing is done based on the object received from ECS in `events.proto` definition
*
* @property {SourceEventTypes} type - the source of the event, in this case ECS
* @property {String} id - task id, unique
* @property {String} taskId - task id, unique
* @property {String} name - task name, based on the of the task class and adapted in short form
Expand Down
1 change: 1 addition & 0 deletions Control/lib/typedefs/odc/OdcDeviceInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* This is parsed object by ECS and not the same as the one sent by ODC to ECS. For example:
* * ODC sends 'id' as uint64 but ECS parses it to 'taskId' as string
*
* @property {SourceEventTypes} source - the source of the event, in this case ODC
* @property {String} taskId - ODC 'id' but renamed by ECS to 'taskId'
* @property {String} state
* @property {String} epnState
Expand Down
1 change: 1 addition & 0 deletions Control/lib/typedefs/odc/OdcDeviceInfoEvent.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* This is parsed object by ECS and not the same as the one sent by ODC to ECS. For example:
* * ODC sends 'id' as uint64 but ECS parses it to 'taskId' as string
*
* @property {String} source - has the value 'ODC' to identify the source of the event
* @property {String} taskId - ODC 'id' but renamed by ECS to 'taskId'
* @property {String} state
* @property {String} epnState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const UNKNOWN = 'UNKNOWN';
export const environmentComponentsSummary = (environmentInfo) => {
const odcState = environmentInfo?.hardware?.epn?.info?.state ?? UNKNOWN;
const ddsState = environmentInfo?.hardware?.epn?.info?.ddsSessionStatus ?? UNKNOWN;
const { currentTransition = undefined, state } = environmentInfo;
const { currentTransition = undefined, state, firstTaskInError } = environmentInfo;

const odcStateStyle = ODC_STATE_COLOR[odcState] ? `.${ODC_STATE_COLOR[odcState]}` : '';
const ddsStateStyle = ODC_STATE_COLOR[ddsState] ? `.${ODC_STATE_COLOR[ddsState]}` : '';
Expand All @@ -39,11 +39,17 @@ export const environmentComponentsSummary = (environmentInfo) => {
? `.${ALIECS_TRANSITION_COLOR[currentTransition] ? ALIECS_TRANSITION_COLOR[currentTransition] : ''}`
: `.${ALIECS_STATE_COLOR[state] ? ALIECS_STATE_COLOR[state] : ''}`,
};
return miniCard(_getTitle(currentTransition), [
h('.flex-column', [
h(`${ecsData.style}`, ecsData.info),
h(`${odcStateStyle}`, 'ODC state: ', odcState),
h(`${ddsStateStyle}`, 'DDS state: ', ddsState),

return h('.flex-row.g2', [
miniCard(_getTitle(currentTransition), [
h('.flex-column', [
h(`${ecsData.style}`, ecsData.info),
h(`${odcStateStyle}`, 'ODC state: ', odcState),
h(`${ddsStateStyle}`, 'DDS state: ', ddsState),
]),
]),
firstTaskInError && miniCard(h('h5.danger','First Task In Critical Error'), [
_firstTaskInErrorDisplay(firstTaskInError)
]),
]);
};
Expand All @@ -64,3 +70,52 @@ const _getTitle = (currentTransition) =>
h('h5.flex-column.flex-center', 'Components State')
]
);

/**
* @private
* Method to get the first task in error display, it checks if the event is an ODC device event or a ECS task event and creates the display accordingly
* @param {TaskEvent | OdcDeviceInfoEvent} taskEvent - the task event with error information
* @returns {vnode} - display of the task event in case of error
*/
const _firstTaskInErrorDisplay = (taskEvent) => {
return h('.flex-column.danger',
[
h('span', `Source: ${taskEvent.source}`),
...(taskEvent?.source === 'ODC' // SourceEventsTypes
? _odcDeviceEventInErrorDisplay(taskEvent)
: _ecsTaskEventInErrorDisplay(taskEvent))
]
);
};

/**
* @private
* Method to create the display of the task event in case of error
* @param {TaskEvent} taskEvent - the task event with error information
* @returns {vnode} - display of the task event in case of error
*/
const _ecsTaskEventInErrorDisplay = (taskEvent = {}) => {
const { name, hostname, id, status } = taskEvent;
return [
h('span', `ID: ${id}`),
h('span', `Name: ${name}`),
h('span', `Host: ${hostname}`),
h('span', `Status: ${status}`),
];
};

/**
* @private
* Method to create the display of the ODC device event in case of error
* @param {OdcDeviceInfoEvent} odcDeviceEvent - the ODC device event with error information
* @returns {vnode} - display of the ODC device event in case of error
*/
const _odcDeviceEventInErrorDisplay = (odcDeviceEvent = {}) => {
const { id, hostname, path, error } = odcDeviceEvent;
return [
h('span', `ID: ${id}`),
h('span', `Host: ${hostname}`),
h('span', `Path: ${path}`),
error && h('.danger', `Error: ${error}`)
];
};
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ describe(`'EnvironmentCacheService' - test suite`, () => {
isDeploying: undefined,
deploymentError: undefined,
state: 'inactive',
events: []
events: [],
firstTaskInError: undefined,
});
assert.strictEqual(broadcastServiceMock.broadcast.callCount, 1);
});
Expand All @@ -88,6 +89,47 @@ describe(`'EnvironmentCacheService' - test suite`, () => {

assert.ok(environmentCacheService._lastUpdate >= beforeUpdate);
});

it('should preserve the `firstTaskInError` field when updating an existing environment', () => {
const firstTaskInError = {
environmentId: 'env123',
state: 'ERROR',
taskid: 1,
name: 'task1',
hostname: 'host1',
className: 'class1',
isCritical: false,
};

const initialEnvironment = {
id: 'env123',
state: 'RUNNING',
firstTaskInError: firstTaskInError,
};

environmentCacheService.addOrUpdateEnvironment(initialEnvironment);

assert.strictEqual(environmentCacheService._environments.size, 1);
assert.deepStrictEqual(
environmentCacheService._environments.get('env123').firstTaskInError,
firstTaskInError
);

const updatedEnvironment = {
id: 'env123',
state: 'CONFIGURED',
someOtherField: 'newValue',
};

environmentCacheService.addOrUpdateEnvironment(updatedEnvironment);

assert.strictEqual(environmentCacheService._environments.size, 1);
const cachedEnv = environmentCacheService._environments.get('env123');
assert.strictEqual(cachedEnv.state, 'CONFIGURED');
assert.strictEqual(cachedEnv.someOtherField, 'newValue');
assert.deepStrictEqual(cachedEnv.firstTaskInError, firstTaskInError,
'firstTaskInError should be preserved after update');
});
});

describe('`get environments` method', () => {
Expand Down Expand Up @@ -332,12 +374,12 @@ describe(`'EnvironmentCacheService' - test suite`, () => {
environmentCacheService.addOrUpdateEnvironment(initialEnvironment);
const firstTaskInErrorEventSent = {
environmentId: 'env1',
state: 'ERROR',
state: 'ERROR_CRITICAL',
taskid: 1,
name: 'task1',
hostname: 'host1',
className: 'class1',
isCritical: false,
isCritical: true,
};
eventEmitter.emit(TASKS_TRACK, {
timestamp: Date.now(),
Expand Down