-
Notifications
You must be signed in to change notification settings - Fork 1
feat: Typescript implementation #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Codecov ReportAttention: Patch coverage is
📢 Thoughts on this report? Let us know! |
| this.data = data.data | ||
| this.meta = { | ||
| retryCount: 0, | ||
| startTime: Date.now() + (data.delay ?? 0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Лучше вообще не писать свойство, если это можно сделать.
И в целом нейминг свойст можно более ужатый
| startTime: Date.now() + (data.delay ?? 0), | ||
| failed: false, | ||
| // TODO: Is this correct? | ||
| timeout: data.timeout ?? 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Это нафиг
| this.meta = { | ||
| retryCount: 0, | ||
| startTime: Date.now() + (data.delay ?? 0), | ||
| failed: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Это по принципу отсутствие failed === false
| subjects: subjects, | ||
| duplicate_window: nanos(this.duplicateWindow), | ||
| }) | ||
| console.log(`Stream '${this.name}' created successfully.`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Давай логгирование уберем и перейдем на евен емиттинг.
Возможно самое простое - это отнаследоваться от EventEmitter. Реализацию опять таки можно подсмотреть в BullMQ, вроде удобно.
| "@nats-io/kv": "3.0.2", | ||
| "@nats-io/jetstream": "3.0.2", | ||
| "@nats-io/nats-core": "3.0.2", | ||
| "@nats-io/transport-node": "3.0.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Вот эти зависимости надо указывать в peerDependecies.
И в дев вернуть.
| } | ||
|
|
||
| // TODO: I think this is not needed | ||
| public async close(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Если коннекшн внешний, то можно close убирать.
|
|
||
| public async addJob(job: Job, priority: number = 1): Promise<void> { | ||
| if (this.connection.isClosed()) { | ||
| throw new Error('Cannot add job when NATS connection is closed.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Наверное, надо хотя начать с того, чтобы все ошибки либы были от кастомного класса
NatsQueueError extends Error типа
| } | ||
|
|
||
| public async addJob(job: Job, priority: number = 1): Promise<void> { | ||
| if (this.connection.isClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Нахер
| const DEFAULT_DEDUPLICATE_WINDOW = 2000 | ||
| const MIN_DUPLICATE_WINDOW = 100 | ||
|
|
||
| export type QueueOpts = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Подумай насчет префикса
| @@ -0,0 +1,29 @@ | |||
| import { JobCreateData } from './types' | |||
|
|
|||
| export class Job { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Надо джобу иметь для консьюмера.
И это дает возможность что-то попытаться сделать lazy.
| const msgHeaders = headers() | ||
| msgHeaders.set('Nats-Msg-Id', job.id) | ||
|
|
||
| await this.client.publish(`${job.queueName}.${priority}`, jobData, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Давайте попробуем
queueName.PRIORITY.jobName
| export type WorkerOpts = { | ||
| client: JetStreamClient | ||
| name: string | ||
| processor: (job: JsMsg, timeout: number) => Promise<void> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timeout минус
Здесь job - то уже наш класс, а не JsMsg
| for (let i = 1; i <= this.priorities; i++) { | ||
| // TODO: Naming might be wrong, independent of the queue name | ||
| const consumerName = `worker_group_${i}` | ||
| const subject = `${this.name}.${i}` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Это name.${i}.*
| `Job: name=${data.name} id=${data.id} is started with data=${data.data} in queue=${data.queueName}`, | ||
| ) | ||
|
|
||
| const timeout = data.meta.timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Это минус
| ) | ||
| } | ||
|
|
||
| const newId = `${crypto.randomUUID()}_${Date.now()}` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Надо явно это вынести либо в отдельный метод, либо в метод класса джобы.
|
|
||
| protected async fetch(consumer: Consumer, count: number): Promise<JsMsg[]> { | ||
| // TODO: Maybe fail to fetch consumer info | ||
| const consumerInfo = await consumer.info() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Мне кажется не нужно
| filter_subject: subject, | ||
| name: consumerName, | ||
| durable_name: consumerName, | ||
| ack_policy: AckPolicy.All, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Глобальный конкаренси НЕ ДЕЛАЕМ!!
| const awaitedMessages: JsMsg[] = [] | ||
|
|
||
| for await (const msg of msgs) { | ||
| awaitedMessages.push(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Мне кажется, нечего тут массив собирать, давай процессить сразу.
No description provided.