aboutsummaryrefslogtreecommitdiffstats
path: root/packages/excalidraw/workers.ts
blob: f5964d08843c596720e4656ca29a9c16af05e875 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import { WorkerInTheMainChunkError, WorkerUrlNotDefinedError } from "./errors";
import { debounce } from "./utils";

class IdleWorker {
  public instance: Worker;

  constructor(workerUrl: URL) {
    this.instance = new Worker(workerUrl, { type: "module" });
  }

  /**
   * Use to prolong the worker's life by `workerTTL` or terminate it with a flush immediately.
   */
  public debounceTerminate!: ReturnType<typeof debounce>;
}

/**
 * Pool of idle short-lived workers.
 *
 * IMPORTANT: for simplicity it does not limit the number of newly created workers, leaving it up to the caller to manage the pool size.
 */
export class WorkerPool<T, R> {
  private idleWorkers: Set<IdleWorker> = new Set();
  private readonly workerUrl: URL;
  private readonly workerTTL: number;

  private constructor(
    workerUrl: URL,
    options: {
      ttl?: number;
    },
  ) {
    this.workerUrl = workerUrl;
    // by default, active & idle workers will be terminated after 1s of inactivity
    this.workerTTL = options.ttl || 1000;
  }

  /**
   * Create a new worker pool.
   *
   * @param workerUrl - The URL of the worker file.
   * @param options - The options for the worker pool.
   * @throws If the worker is bundled into the main chunk.
   * @returns A new worker pool instance.
   */
  public static create<T, R>(
    workerUrl: URL | undefined,
    options: {
      ttl?: number;
    } = {},
  ): WorkerPool<T, R> {
    if (!workerUrl) {
      throw new WorkerUrlNotDefinedError();
    }

    if (!import.meta.url || workerUrl.toString() === import.meta.url) {
      // in case the worker code is bundled into the main chunk
      throw new WorkerInTheMainChunkError();
    }

    return new WorkerPool(workerUrl, options);
  }

  /**
   * Take idle worker from the pool or create a new one and post a message to it.
   */
  public async postMessage(
    data: T,
    options: StructuredSerializeOptions,
  ): Promise<R> {
    let worker: IdleWorker;

    const idleWorker = Array.from(this.idleWorkers).shift();
    if (idleWorker) {
      this.idleWorkers.delete(idleWorker);
      worker = idleWorker;
    } else {
      worker = await this.createWorker();
    }

    return new Promise((resolve, reject) => {
      worker.instance.onmessage = this.onMessageHandler(worker, resolve);
      worker.instance.onerror = this.onErrorHandler(worker, reject);

      worker.instance.postMessage(data, options);
      worker.debounceTerminate(() =>
        reject(
          new Error(`Active worker did not respond for ${this.workerTTL}ms!`),
        ),
      );
    });
  }

  /**
   * Terminate the idle workers in the pool.
   */
  public async clear() {
    for (const worker of this.idleWorkers) {
      worker.debounceTerminate.cancel();
      worker.instance.terminate();
    }

    this.idleWorkers.clear();
  }

  /**
   * Used to get a worker from the pool or create a new one if there is no idle available.
   */
  private async createWorker(): Promise<IdleWorker> {
    const worker = new IdleWorker(this.workerUrl);

    worker.debounceTerminate = debounce((reject?: () => void) => {
      worker.instance.terminate();

      if (this.idleWorkers.has(worker)) {
        this.idleWorkers.delete(worker);

        // eslint-disable-next-line no-console
        console.debug(
          "Job finished! Idle worker has been released from the pool.",
        );
      } else if (reject) {
        reject();
      } else {
        console.error("Worker has been terminated!");
      }
    }, this.workerTTL);

    return worker;
  }

  private onMessageHandler(worker: IdleWorker, resolve: (value: R) => void) {
    return (e: { data: R }) => {
      worker.debounceTerminate();
      this.idleWorkers.add(worker);
      resolve(e.data);
    };
  }

  private onErrorHandler(
    worker: IdleWorker,
    reject: (reason: ErrorEvent) => void,
  ) {
    return (e: ErrorEvent) => {
      // terminate the worker immediately before rejection
      worker.debounceTerminate(() => reject(e));
      worker.debounceTerminate.flush();

      // clear the worker pool in case there are some idle workers left
      this.clear();
    };
  }
}