Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { MongoClient } from './mongo_client';
import { type InferIdType, TypedEventEmitter } from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import type { OperationParent } from './operations/command';
import { DeprioritizedServers } from './sdam/server_selection';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
Expand Down Expand Up @@ -1073,7 +1074,8 @@ export class ChangeStream<
try {
await topology.selectServer(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream',
timeoutContext: this.timeoutContext
timeoutContext: this.timeoutContext,
deprioritizedServers: new DeprioritizedServers()
});
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
} catch {
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ export type {
TagSet,
TopologyVersion
} from './sdam/server_description';
export type { ServerSelector } from './sdam/server_selection';
export type { DeprioritizedServers, ServerSelector } from './sdam/server_selection';
export type { SrvPoller, SrvPollerEvents, SrvPollerOptions } from './sdam/srv_polling';
export type {
ConnectOptions,
Expand Down
4 changes: 2 additions & 2 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_conc
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import type { ServerMonitoringMode } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import { readPreferenceServerSelector } from './sdam/server_selection';
import { DeprioritizedServers, readPreferenceServerSelector } from './sdam/server_selection';
import type { SrvPoller } from './sdam/srv_polling';
import { Topology, type TopologyEvents } from './sdam/topology';
import { ClientSession, type ClientSessionOptions, ServerSessionPool } from './sessions';
Expand Down Expand Up @@ -789,7 +789,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
// to avoid the server selection timeout.
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
const serverDescriptions = Array.from(topologyDescription.servers.values());
const servers = selector(topologyDescription, serverDescriptions);
const servers = selector(topologyDescription, serverDescriptions, new DeprioritizedServers());
if (servers.length !== 0) {
const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id);
if (endSessions.length !== 0) {
Expand Down
11 changes: 6 additions & 5 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {
} from '../error';
import type { MongoClient } from '../mongo_client';
import { ReadPreference } from '../read_preference';
import type { ServerDescription } from '../sdam/server_description';
import {
DeprioritizedServers,
sameServerSelector,
secondaryWritableServerSelector,
type ServerSelector
Expand Down Expand Up @@ -207,7 +207,8 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
session,
operationName: operation.commandName,
timeoutContext,
signal: operation.options.signal
signal: operation.options.signal,
deprioritizedServers: new DeprioritizedServers()
});

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
Expand All @@ -234,7 +235,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro

const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;
const deprioritizedServers = new DeprioritizedServers();

for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
Expand Down Expand Up @@ -270,7 +271,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
previousServer,
deprioritizedServers,
signal: operation.options.signal
});

Expand Down Expand Up @@ -303,7 +304,7 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
) {
throw previousOperationError;
}
previousServer = server.description;
deprioritizedServers.add(server.description);
previousOperationError = operationError;

// Reset timeouts
Expand Down
Loading