Skip to content

Conversation

@bjing94
Copy link
Collaborator

@bjing94 bjing94 commented Jul 15, 2025

No description provided.

@codecov-commenter
Copy link

codecov-commenter commented Jul 15, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 65.66265% with 228 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/worker.ts 60.57% 192 Missing ⚠️
src/queue.ts 75.67% 35 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

this.data = data.data
this.meta = {
retryCount: 0,
startTime: Date.now() + (data.delay ?? 0),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше вообще не писать свойство, если это можно сделать.
И в целом нейминг свойст можно более ужатый

startTime: Date.now() + (data.delay ?? 0),
failed: false,
// TODO: Is this correct?
timeout: data.timeout ?? 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это нафиг

this.meta = {
retryCount: 0,
startTime: Date.now() + (data.delay ?? 0),
failed: false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это по принципу отсутствие failed === false

subjects: subjects,
duplicate_window: nanos(this.duplicateWindow),
})
console.log(`Stream '${this.name}' created successfully.`)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Давай логгирование уберем и перейдем на евен емиттинг.
Возможно самое простое - это отнаследоваться от EventEmitter. Реализацию опять таки можно подсмотреть в BullMQ, вроде удобно.

"@nats-io/kv": "3.0.2",
"@nats-io/jetstream": "3.0.2",
"@nats-io/nats-core": "3.0.2",
"@nats-io/transport-node": "3.0.2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот эти зависимости надо указывать в peerDependecies.
И в дев вернуть.

}

// TODO: I think this is not needed
public async close(): Promise<void> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если коннекшн внешний, то можно close убирать.


public async addJob(job: Job, priority: number = 1): Promise<void> {
if (this.connection.isClosed()) {
throw new Error('Cannot add job when NATS connection is closed.')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Наверное, надо хотя начать с того, чтобы все ошибки либы были от кастомного класса
NatsQueueError extends Error типа

}

public async addJob(job: Job, priority: number = 1): Promise<void> {
if (this.connection.isClosed()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нахер

const DEFAULT_DEDUPLICATE_WINDOW = 2000
const MIN_DUPLICATE_WINDOW = 100

export type QueueOpts = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Подумай насчет префикса

@@ -0,0 +1,29 @@
import { JobCreateData } from './types'

export class Job {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Надо джобу иметь для консьюмера.
И это дает возможность что-то попытаться сделать lazy.

const msgHeaders = headers()
msgHeaders.set('Nats-Msg-Id', job.id)

await this.client.publish(`${job.queueName}.${priority}`, jobData, {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Давайте попробуем
queueName.PRIORITY.jobName

export type WorkerOpts = {
client: JetStreamClient
name: string
processor: (job: JsMsg, timeout: number) => Promise<void>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout минус
Здесь job - то уже наш класс, а не JsMsg

for (let i = 1; i <= this.priorities; i++) {
// TODO: Naming might be wrong, independent of the queue name
const consumerName = `worker_group_${i}`
const subject = `${this.name}.${i}`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это name.${i}.*

`Job: name=${data.name} id=${data.id} is started with data=${data.data} in queue=${data.queueName}`,
)

const timeout = data.meta.timeout
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это минус

)
}

const newId = `${crypto.randomUUID()}_${Date.now()}`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Надо явно это вынести либо в отдельный метод, либо в метод класса джобы.


protected async fetch(consumer: Consumer, count: number): Promise<JsMsg[]> {
// TODO: Maybe fail to fetch consumer info
const consumerInfo = await consumer.info()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется не нужно

filter_subject: subject,
name: consumerName,
durable_name: consumerName,
ack_policy: AckPolicy.All,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Глобальный конкаренси НЕ ДЕЛАЕМ!!

const awaitedMessages: JsMsg[] = []

for await (const msg of msgs) {
awaitedMessages.push(msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, нечего тут массив собирать, давай процессить сразу.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants