客户端程序如下所示:
func run(cli *mongo.Client, ts primitive.Timestamp) error {
cs, err := clie.Watch(context.Background(), bson.A{}, &options.ChangeStreamOptions{
StartAtOperationTime: &ts,
})
if err != nil {
return err
}
for cs.Next(context.Background()) {
var doc bson.D
err = bson.Unmarshal(cs.Current, &doc)
if err != nil {
return err
}
// other code to consume doc
}
return cs.Err()
}
后面主要分享mongodb内核对change stream相关的处理,也上面的步骤2~11。基于https://github.com/mongodb/mongo/tree/v4.2 代码分析。
带着问题看下面的讲解:
下面以一个真实的change stream使用场景为例,遵循时间顺序依次讲解mongos和mongod的处理逻辑。
请求:
{
aggregate: 1,
pipeline: [
{
'$changeStream': {
allChangesForCluster: true,
startAtOperationTime: Timestamp({ t: 1698997099, i: 0 }),
}
}
],
cursor: {},
lsid: { id: new UUID("14265384-a9d8-4aa5-a60c-f95d499ea833") },
'$clusterTime': {
clusterTime: Timestamp({ t: 1698997490, i: 1 }),
signature: {
hash: Binary("000000000000000000000000000000"),
keyId: Long("0")
}
},
'$db': 'admin'
}
mongos返回一个cursor id给driver:
{
cursor: {
firstBatch: {},
postBatchResumeToken: {_data: "8200000001000000002B0229296E04"},
id: 9208559579072114181,
ns: admin.$cmd.aggregate
},
ok: 1,
operationTime: Timestamp({ t: 1698997500, i: 1 }),
$clusterTime: {
clusterTime: Timestamp({ t: 1698998143, i: 1 }),
signature: {
hash: Binary("00000000000000000000000"),
keyId: Long("0")
}
}
}
下面来探究mongos是如何处理这个请求的:
ClusterPipelineCommand::run src/mongo/s/commands/cluster_pipeline_cmd.cpp
ClusterAggregate::runAggregate src/mongo/s/query/cluster_aggregate.cpp
LiteParsedPipeline::LiteParsedPipeline
LiteParsedDocumentSource::parse
DocumentSourceChangeStream::LiteParsed::parse
ClusterAggregate::runAggregate
Pipeline::parse // 构建pipeline
Pipeline::parseTopLevelOrFacetPipeline
DocumentSource::parse
DocumentSourceChangeStream::createFromBson
buildPipeline
sharded_agg_helpers::dispatchShardPipeline
mustRunOnAllShards
cluster_aggregation_planner::splitPipeline // 分离pipeline
findSplitPoint
createCommandForTargetedShards
genericTransformForShards
establishShardCursors // 在每个shard上创建cursor
establishCursors
dispatchMergingPipeline // 创建在mongos运行的pipeline
cluster_aggregation_planner::addMergeCursorsSource
DocumentSourceMergeCursors::create
runPipelineOnMongoS // 在mongos上运行pipeline
establishMergingMongosCursor // 建立一个总的cursor
cluster_aggregation_planner::buildClusterCursor
ccc->next(kInitialFind)
RouterStagePipeline::next
Pipeline::getNext // return EOF
DocumentSourceCloseCursor::getNext
DocumentSourceUpdateOnAddShard::getNext
DocumentSourceMergeCursors::getNext
setPostBatchResumeToken(ccc->getPostBatchResumeToken())
RouterStagePipeline::getPostBatchResumeToken
DocumentSourceMergeCursors::getHighWaterMark
BlockingResultsMerger::getHighWaterMark
AsyncResultsMerger::getHighWaterMark
registerCursor // 保存cursor信息,等待getMore请求
最初构建的pipeline,是由如下几个stage组成:
前4个stage是shard stage,不会运行在mongos上。对于mongos来说这四个stage的意义就是生成发送给shard的命令,之后会把这几个stage丢弃。DocumentSourceCloseCursor是merge stage,要在mongos上做,merge的时候以_id._data
的值排序。
cluster_aggregation_planner::addMergeCursorsSource 函数中,真正建立了要在mongos运行的pipeline(数据从上往下流动):
建立好pipeline之后,会调用getNext运行一下,初始化各个stage,从各个shard拿到cursor id和post batch resume token,之后返回EOF。
mongos向mongod发送的aggregate请求如下:(pipeline跟driver发送给mongos的pipeline一样)
mongod把cursor id返回给mongos:
{
cursor: {
firstBatch: {},
postBatchResumeToken: {_data: "8200000001000000002B0229296E04"},
id: 9208559579072114181,
ns: admin.$cmd.aggregate
},
$_internalLatestOplogTimestamp: Timestamp({ t: 1698997500, i: 1 }),
ok: 1,
operationTime: Timestamp({ t: 1698997500, i: 1 }),
$gleStats: {
lastOpTime: Timestamp({ t: 1234356, i: 0 }),
electionId: ObjectID("7fffffff0000000000000001")
},
lastCommittedOpTime: Timestamp({ t: 1698997500, i: 1 }),
$configServerState: {
opTime: { ts: Timestamp({ t: 1698998143, i: 1 }), t: Long("1") }
},
$clusterTime: {
clusterTime: Timestamp({ t: 1698998143, i: 1 }),
signature: {
hash: Binary("00000000000000000000000"),
keyId: Long("0")
}
}
operationTime: Timestamp({ t: 1698997500, i: 1 }),
}
PipelineCommand::Invocation::run
runAggregate src/mongo/db/commands/run_aggregate.cpp
LiteParsedPipeline::LiteParsedPipeline
LiteParsedDocumentSource::parse
DocumentSourceChangeStream::LiteParsed::parse
Pipeline::parse // 构建pipeline,mongos走的也是这个
Pipeline::parseTopLevelOrFacetPipeline
DocumentSource::parse
DocumentSourceChangeStream::createFromBson
buildPipeline
DocumentSourceChangeStream::buildMatchFilter
PipelineD::buildInnerQueryExecutor // 创建aggregate框架下层的executor
PipelineD::buildInnerQueryExecutorGeneric
PipelineD::prepareExecutor // 生成对local.oplog.rs的执行计划
attemptToGetExecutor
getExecutorFind
_getExecutorFind
getOplogStartHack
WiredTigerRecordStore::oplogStartHack // 给出collection scan的起始位置
PipelineD::attachInnerQueryExecutorToPipeline
DocumentSourceCursor::create
PipelineD::addCursorSource
Pipeline::addInitialSource
createOuterPipelineProxyExecutor // 为change stream加一个proxy stage
ChangeStreamProxyStage::ChangeStreamProxyStage
PipelineProxyStage::PipelineProxyStage
registerCursor // 保存cursor
handleCursorCommand
setPostBatchResumeToken(exec->getPostBatchResumeToken())
PlanExecutorImpl::getPostBatchResumeToken
ChangeStreamProxyStage::getPostBatchResumeToken
跟mongos差不多,最初构建的pipeline由如下几个stage组成:
最后经过调整,pipeline是这样:
请求如下:
{
getMore: Long("6439458890814903597"),
collection: '$cmd.aggregate',
lsid: { id: new UUID("14265384-a9d8-4aa5-a60c-f95d499ea833") },
'$clusterTime': {
clusterTime: Timestamp({ t: 1698997500, i: 1 }),
signature: {
hash: Binary("000000000000000000000000000000000"),
keyId: Long("0")
}
},
'$db': 'admin'
}
mongos返回数据给driver:
{
cursor: {
id: 9208559579072114181,
ns: admin.$cmd.aggregate,
nextBatch: {},
postBatchResumeToken: {_data: "8200000001000000002B0229296E04"}
},
ok: 1,
operationTime: Timestamp({ t: 1698997500, i: 1 }),
$clusterTime: {
clusterTime: Timestamp({ t: 1698998143, i: 1 }),
signature: {
hash: Binary("00000000000000000000000"),
keyId: Long("0")
}
}
}
ClusterGetMoreCmd::Invocation::run
ClusterFind::runGetMore
cursorManager->checkOutCursor
ClusterCursorManager::PinnedCursor::next
ClusterClientCursorImpl::next
RouterStagePipeline::next
Pipeline::getNext
DocumentSourceCloseCursor::getNext
DocumentSourceUpdateOnAddShard::getNext
DocumentSourceMergeCursors::getNext
BlockingResultsMerger::next
BlockingResultsMerger::awaitNextWithTimeout
BlockingResultsMerger::getNextEvent
AsyncResultsMerger::nextEvent
AsyncResultsMerger::_scheduleGetMores
mongos对driver发送来的getMore请求处理比较简单:
下面先看mongod如何处理getMore请求,以及返回给mongos什么数据。然后再看mongos如何返回数据给客户端。
请求如下:
之后,mongod把数据返回给mongos:
{
cursor: {
nextBatch: {},
postBatchResumeToken: {_data: "8200000001000000002B0229296E04"},
id: 9208559579072114181,
ns: admin.$cmd.aggregate
},
$_internalLatestOplogTimestamp: Timestamp({ t: 1698997500, i: 1 }),
ok: 1,
$gleStats: {
lastOpTime: Timestamp({ t: 0, i: 0 }),
electionId: ObjectID("7fffffff0000000000000001")
},
lastCommittedOpTime: Timestamp({ t: 1698997500, i: 1 }),
$configServerState: {
opTime: { ts: Timestamp({ t: 1698998143, i: 1 }), t: Long("1") }
},
$clusterTime: {
clusterTime: Timestamp({ t: 1698998143, i: 1 }),
signature: {
hash: Binary("00000000000000000000000"),
keyId: Long("0")
}
},
operationTime: Timestamp({ t: 1698997500, i: 1 })
}
这里面有两个重要的东西nextBatch和postBatchResumeToken,分别是数据和断点续传标记。下面重点看这两个东西怎么来的。
GetMoreCmd::Invocation::run
GetMoreCmd::Invocation::acquireLocksAndIterateCursor
GetMoreCmd::Invocation::generateBatch
PlanExecutorImpl::getNext
PlanExecutorImpl::_getNextImpl
PlanStage::work
PipelineProxyStage::doWork (ChangeStreamProxyStage继承自PipelineProxyStage)
ChangeStreamProxyStage::getNextBson
Pipeline::getNext
DocumentSourceCheckResumability::getNext
DocumentSourceCheckInvalidate::getNext
DocumentSourceChangeStreamTransform::getNext
DocumentSourceCursor::getNext
DocumentSourceCursor::loadBatch
...
CollectionScan::doWork
setPostBatchResumeToken(exec->getPostBatchResumeToken())
PlanExecutorImpl::getPostBatchResumeToken
ChangeStreamProxyStage::getPostBatchResumeToken // read _postBatchResumeToken
跟mongos一样,取出cursor之后,就开始走pipeline流程。下面从数据流动的方向开始讲解:
query长这样:
过滤条件有很多:
collectionScan不是从头开始的,而是从小于等于客户端指定时间戳,且最近的那条oplog开始。如果所有oplog的ts都比客户端指定时间戳大,collectionScan就从头开始。然后collectionScan会检查这条oplog,如果不满足下面任一条件,就说明oplog被冲了,会抛出ErrorCodes::OplogQueryMinTsMissing 异常。
只要有一个shard上的oplog被冲了,change stream命令就会报错。
可以修改了这个filter,来捕获更多的oplog。
另外需要关注,CollectionScan每次返回文档之前都更新一下_latestOplogEntryTimestamp:
Status CollectionScan::setLatestOplogEntryTimestamp(const Record& record) {
auto tsElem = record.data.toBson()[repl::OpTime::kTimestampFieldName];
_latestOplogEntryTimestamp = std::max(_latestOplogEntryTimestamp, tsElem.timestamp());
return Status::OK();
}
DocumentSourceCursor对接CollectionScan等普通查询,作为aggregate pipeline的数据来源。
这里注意一下_latestOplogTimestamp这个私有变量,每次getNext都会被更新:
DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
// ......
// If we are tracking the oplog timestamp, update our cached latest optime.
if (_trackOplogTS && _exec)
_updateOplogTimestamp();
// ......
}
void DocumentSourceCursor::_updateOplogTimestamp() {
// If we are about to return a result, set our oplog timestamp to the optime of that result.
if (!_currentBatch.isEmpty()) {
const auto& ts = _currentBatch.peekFront().getField(repl::OpTime::kTimestampFieldName);
invariant(ts.getType() == BSONType::bsonTimestamp);
_latestOplogTimestamp = ts.getTimestamp(); // 设置为要返回的oplog的ts
return;
}
// If we have no more results to return, advance to the latest oplog timestamp.
_latestOplogTimestamp = _exec->getLatestOplogTimestamp(); // 从collection scan取时间戳
}
Timestamp CollectionScan::getLatestOplogTimestamp() const {
return _latestOplogEntryTimestamp;
}
这个stage负责把oplog转换成event。
DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
while (1) {
// If we're unwinding an 'applyOps' from a transaction, check if there are any documents we
// have stored that can be returned.
if (_txnIterator) {
if (auto next = _txnIterator->getNextTransactionOp(pExpCtx->opCtx)) {
return applyTransformation(*next);
}
_txnIterator = boost::none;
}
// Get the next input document.
auto input = pSource->getNext();
if (!input.isAdvanced()) {
return input;
}
auto doc = input.releaseDocument();
auto op = doc[repl::OplogEntry::kOpTypeFieldName];
auto opType =
repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op.getStringData());
auto commandVal = doc["o"];
if (opType != repl::OpTypeEnum::kCommand ||
(commandVal["applyOps"].missing() && commandVal["commitTransaction"].missing())) {
return applyTransformation(doc);
}
// The only two commands we will see here are an applyOps or a commit, which both mean we
// need to open a "transaction context" representing a group of updates that all occurred at
// once as part of a transaction.
invariant(!_txnIterator);
_txnIterator.emplace(pExpCtx->opCtx, pExpCtx->mongoProcessInterface, doc, *_nsRegex);
}
}
会把applyOps拆开处理。对于普通oplog,直接进行transform:
Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
MutableDocument doc;
// Extract the fields we need.
string op = input[repl::OplogEntry::kOpTypeFieldName].getString();
Value ts = input[repl::OplogEntry::kTimestampFieldName];
Value ns = input[repl::OplogEntry::kNssFieldName];
Value uuid = input[repl::OplogEntry::kUuidFieldName];
std::vector<FieldPath> documentKeyFields;
// Deal with CRUD operations and commands.
auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op);
NamespaceString nss(ns.getString());
// Ignore commands in the oplog when looking up the document key fields since a command implies
// that the change stream is about to be invalidated (e.g. collection drop).
if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) {
documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields;
}
Value id = input.getNestedField("o._id");
// Non-replace updates have the _id in field "o2".
StringData operationType;
Value fullDocument;
Value updateDescription;
Value documentKey;
switch (opType) {
case repl::OpTypeEnum::kInsert: {
operationType = DocumentSourceChangeStream::kInsertOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
documentKey = Value(document_path_support::extractPathsFromDoc(
fullDocument.getDocument(), documentKeyFields));
break;
}
case repl::OpTypeEnum::kDelete: {
operationType = DocumentSourceChangeStream::kDeleteOpType;
documentKey = input[repl::OplogEntry::kObjectFieldName];
break;
}
case repl::OpTypeEnum::kUpdate: {
if (id.missing()) {
// non-replace style update
operationType = DocumentSourceChangeStream::kUpdateOpType;
// ......
} else {
// replace style update
operationType = DocumentSourceChangeStream::kReplaceOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
}
documentKey = input[repl::OplogEntry::kObject2FieldName];
break;
}
case repl::OpTypeEnum::kCommand: {
if (!input.getNestedField("o.drop").missing()) {
operationType = DocumentSourceChangeStream::kDropCollectionOpType;
nss = NamespaceString(nss.db(), input.getNestedField("o.drop").getString());
} else if (!input.getNestedField("o.renameCollection").missing()) {
operationType = DocumentSourceChangeStream::kRenameCollectionOpType;
// ......
} else if (!input.getNestedField("o.dropDatabase").missing()) {
operationType = DocumentSourceChangeStream::kDropDatabaseOpType;
nss = NamespaceString(nss.db());
} else {
// All other commands will invalidate the stream.
operationType = DocumentSourceChangeStream::kInvalidateOpType;
}
// Make sure the result doesn't have a document key.
documentKey = Value();
break;
}
case repl::OpTypeEnum::kNoop: {
operationType = DocumentSourceChangeStream::kNewShardDetectedOpType;
// Generate a fake document Id for NewShardDetected operation so that we can resume
// after this operation.
documentKey = Value(Document);
break;
}
default: { MONGO_UNREACHABLE; }
}
// Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
// in the output.
auto resumeTokenData = getResumeToken(ts, uuid, documentKey);
auto resumeToken = ResumeToken(resumeTokenData).toDocument();
// Add some additional fields only relevant to transactions.
if (_txnIterator) {
doc.addField(DocumentSourceChangeStream::kTxnNumberField,
Value(static_cast<long long>(_txnIterator->txnNumber())));
doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnIterator->lsid()));
}
doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken));
doc.addField(DocumentSourceChangeStream::kOperationTypeField, Value(operationType));
doc.addField(DocumentSourceChangeStream::kClusterTimeField, Value(resumeTokenData.clusterTime));
doc.setSortKeyMetaField(resumeToken.toBson());
// ......
doc.addField(DocumentSourceChangeStream::kDocumentKeyField, documentKey);
// ......
}
这里就是对CURD操作和一些DDL操作进行了转换。需要注意的是
可以修改了这里,支持对更多类型的oplog进行转换。
这个stage检查event,如果符合下面条件就生成一个 invalidate类型的event:
这个stage做断点续传的恢复工作。
DocumentSource::GetNextResult DocumentSourceCheckResumability::getNext() {
if (_resumeStatus == ResumeStatus::kSurpassedToken) {
return pSource->getNext();
}
while (_resumeStatus != ResumeStatus::kSurpassedToken) {
// The underlying oplog scan will throw OplogQueryMinTsMissing if the minTs in the change
// stream filter has fallen off the oplog. Catch this and throw a more explanatory error.
auto nextInput = [this]() {
try {
return pSource->getNext();
} catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) {
uasserted(ErrorCodes::ChangeStreamHistoryLost,
"Resume of change stream was not possible, as the resume point may no "
"longer be in the oplog.");
}
}();
// If we hit EOF, return it immediately.
if (!nextInput.isAdvanced()) {
return nextInput;
}
// Determine whether the current event sorts before, equal to or after the resume token.
_resumeStatus =
compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
switch (_resumeStatus) {
case ResumeStatus::kCheckNextDoc:
// If the result was kCheckNextDoc, we are resumable but must swallow this event.
continue;
case ResumeStatus::kSurpassedToken:
// In this case the resume token wasn't found; it may be on another shard. However,
// since the oplog scan did not throw, we know that we are resumable. Fall through
// into the following case and return the document.
case ResumeStatus::kFoundToken:
// We found the actual token! Return the doc so DSEnsureResumeTokenPresent sees it.
return nextInput;
}
}
MONGO_UNREACHABLE;
}
这个getNext函数的意思是:
目标是不返回重复事件。
boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
if (auto next = _pipeline->getNext()) {
auto nextBSON = _validateAndConvertToBSON(*next);
_latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
_postBatchResumeToken = next->getSortKeyMetaField();
_setSpeculativeReadTimestamp();
return nextBSON;
}
// We ran out of results to return.
auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
if (highWaterMark > _latestOplogTimestamp) {
auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
_postBatchResumeToken = token.toDocument().toBson();
_latestOplogTimestamp = highWaterMark;
_setSpeculativeReadTimestamp();
}
return boost::none;
}
PipelineD::getLatestOplogTimestamp()
返回的就是DocumentSourceCursor::_latestOplogTimestamp
。
这个stage做的主要工作就是维护postBatchResumeToken变量。postBatchResumeToken就是用作断点续传的resume token。
为什么要有high-water-mark resume token?
想象一个场景——客户端watch一个collection,但是这个collection一直没有写入,如果postBatchResumeToken一直不推进,那等客户端拿着很旧的resume token续传时,发现无法恢复。high-water-mark resume token的意义就是即使没有客户端关注的事件产生,也能推进同步点。
每个shard上的事件是有序的,shard间的事件依赖混合逻辑时钟拥有偏序关系。mongos的任务就是汇聚各个shard上的事件并排序,提供一个全局顺序。
ClusterGetMoreCmd::Invocation::run
ClusterFind::runGetMore
cursorManager->checkOutCursor
ClusterCursorManager::PinnedCursor::next
ClusterClientCursorImpl::next
RouterStagePipeline::next
Pipeline::getNext
DocumentSourceCloseCursor::getNext
DocumentSourceUpdateOnAddShard::getNext
DocumentSourceMergeCursors::getNext
DocumentSourceMergeCursors::getNext
BlockingResultsMerger::next
BlockingResultsMerger::awaitNextWithTimeout
BlockingResultsMerger::getNextEvent
AsyncResultsMerger::nextEvent (整个函数持有_mutex)
AsyncResultsMerger::_scheduleGetMores
AsyncResultsMerger::_askForNextBatch
TaskExecutor::scheduleRemoteCommand
AsyncResultsMerger::_handleBatchResponse
AsyncResultsMerger::_processBatchResults
AsyncResultsMerger::_addBatchToBuffer
AsyncResultsMerger::_updateRemoteMetadata
AsyncResultsMerger::nextReady
AsyncResultsMerger::_nextReadySorted
汇聚各个shard上的事件并做排序其实很简单,就是一个归并排序。
这里的核心是AsyncResultsMerger。给每个shard维护了一个docBuffer queue,用来存放getMore response里面的events。每次会按照_id._data
对比所有docBuffer queue的第一条文档,返回最小的那条。
这里有个问题需要考虑:如果一个shard的docBuffer目前是空,mongos该怎么办呢?
如果是因为响应慢了,必须要多等会,否则无法保证全局顺序;如果是因为没有事件,不需要等待。
那么,到底等不等呢?如何适配这两种情况?
这里就要讲到 Post Batch Resume Token(简称PBRT)和 minPromisedSortKey的概念了。前面已经简单介绍过mongod是怎么产生PBRT的——PBRT要么是最近事件的resume token,要么是已扫描的最新oplog的时间戳构成的high-water-mark resume token,取决于是否遇到EOF。PBRT也可以看作是shard给mongos的一个承诺——承诺后面返回的事件都大于等于这个时间戳。mongos从所有shard的PBRT中取一个最小值,叫做minPromisedSortKey。有了minPromisedSortKey就可以解决上面的问题了。
解决方法就是mongos每次返回事件之前都跟minPromisedSortKey对比,
总的来说,mongos的处理策略如下:
bool AsyncResultsMerger::_readySortedTailable(WithLock lk) {
if (_mergeQueue.empty()) {
return false;
}
auto smallestRemote = _mergeQueue.top();
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
// We should always have a minPromisedSortKey from every shard in the sorted tailable case.
auto minPromisedSortKey = _getMinPromisedSortKey(lk);
invariant(!minPromisedSortKey.isEmpty());
return compareSortKeys(keyWeWantToReturn, minPromisedSortKey, *_params.getSort()) <= 0;
}
这样mongos就可以向客户端吐出全局有序的数据流了。mongos发送给客户端的每个响应中都带有一批events和postBatchResumeToken字段,postBatchResumeToken就是resume token,用作断点续传。这个字段的值
DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::getNext() {
auto childResult = pSource->getNext();
while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) {
addNewShardCursors(childResult.getDocument());
childResult = pSource->getNext();
}
return childResult;
}
// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
bool needsUpdate(const Document& childResult) {
return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
DocumentSourceChangeStream::kNewShardDetectedOpType;
}
当有chunk mirgrate到新的shard上,会产生kNewShardDetectedOpType event。这个stage遇到这个类型的event,就会建立通往新shard的cursor。
DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
pExpCtx->checkForInterrupt();
// Close cursor if we have returned an invalidate entry.
if (_shouldCloseCursor) {
uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated");
}
auto nextInput = pSource->getNext();
if (!nextInput.isAdvanced())
return nextInput;
auto doc = nextInput.getDocument();
const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField;
auto operationType = doc[kOperationTypeField].getString();
if (operationType == DocumentSourceChangeStream::kInvalidateOpType) {
_shouldCloseCursor = true;
}
return nextInput;
}
当文件的operationType字段值为invalidate时,下次调用getNext,会抛出异常,关闭cursor。
resume token用作events的排序和change stream的恢复。
struct ResumeTokenData {
enum FromInvalidate : bool {
kFromInvalidate = true,
kNotFromInvalidate = false,
};
enum TokenType : int {
kHighWaterMarkToken = 0, // Token refers to a point in time, not an event.
kEventToken = 128, // Token refers to an actual event in the stream.
};
Timestamp clusterTime; // 如果是kEventToken,就等于oplog的cluster time
int version = 1;
TokenType tokenType = TokenType::kEventToken;
size_t txnOpIndex = 0;
// Flag to indicate that this resume token is from an "invalidate" entry. This will not be set
// on a token from a command that *would* invalidate a change stream, but rather the invalidate
// notification itself.
FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate;
boost::optional<UUID> uuid;
Value documentKey; // _id + shard key
};
/* {
* _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime,
* version, txnOpIndex, UUID, then documentKey in that order.
* _typeBits: BinData - The keystring typebuildMatchFilter bits used for deserialization.
* }
*/
class ResumeToken {
public:
constexpr static StringData kDataFieldName = "_data"_sd;
constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd;
// 把ResumeTokenData转为hex string
explicit ResumeToken(const ResumeTokenData& resumeValue);
// 把hex string转为ResumeTokenData
ResumeTokenData getData() const;
private:
// This is the hex-encoded string encoding all the pieces of the resume token.
std::string _hexKeyString;
// Since we are using a KeyString encoding, we might lose some information about what the
// original types of the serialized values were. For example, the integer 2 and the double 2.0
// will generate the same KeyString. We keep the type bits around so we can deserialize without
// losing information.
Value _typeBits; // 不参与比较,这个字段常常被忽略
};
要想排序,就要尽可能精确地描述event,尽量让每个event都有一个唯一的值。能够做到全局有序,那么恢复也不是什么难事。
ResumeTokenData里面有这些字段:
最后总结一下这些时间戳:
DDL支持不完善的问题:
Starting in MongoDB 6.0, change streams support change notifications for DDL events, like the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor using the showExpandedEvents option.
延迟问题:
Sharded clusters with one or more shards that have little or no activity for the collection , or are “cold”, can negatively affect the response time of the change stream as the mongos must still check with those cold shards to guarantee total ordering of changes. To minimize latency for cold shards, you can specify a lower periodicNoopIntervalSecs value.
孤儿文档问题:
For sharded collections, update operations with multi : true may cause any change streams opened against that collection to send notifications for orphaned documents. Starting in MongoDB 5.3, during range migration, change stream events are not generated for updates to orphaned documents.
性能问题:
If a sharded collection has high levels of activity, the mongos may not be able to keep up with the changes across all of the shards. Consider utilizing notification filters for these types of collections. For example, passing a $match pipeline configured to filter only insert operations.
change stream在mongos、mongod上都会有一个线程。当集群写入量比较大时,相关mongos、mongod上那个线程的cpu常常是100%。
副本集火焰图:
mongod cpu主要花在match和transform两个阶段。
分片集群mongos火焰图: mongos的cpu主要花在排序——resume token的对比,和document与bson之间的转换。
分片集群mongod火焰图: 分片集群的mongod和副本集mongod一样,cpu主要花在match和transform。
4.2之前版本,$changeStream的read concern是majority,不能显式指定为其他。4.2及之后版本,可以使用其他read concern。
change stream请求是一定会发送到所有shard上的:
bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) {
// The following aggregations must be routed to all shards:
// - Any collectionless aggregation, such as non-localOps $currentOp.
// - Any aggregation which begins with a $changeStream stage.
return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
}
drop一个分布在多个shard上的collection,会收到对这个ns的多个drop事件——每个shard产生一个。
mongos上的explain结果:
也可以执行下面命令获取explain结果:
db.aggregate([{"$changeStream":{allChangesForCluster:true, startAtOperationTime:Timestamp({ t: 11, i: 0 })}}], {explain:true})
或者
db.runCommand({"aggregate":1, "pipeline":[{"$changeStream":{allChangesForCluster:true, startAtOperationTime:Timestamp({ t: 11, i: 0 })}}] , cursor:{}, explain:true})
shards字段是每个shard上的执行计划,里面有一些被折叠没有显示的Array、Object,在mongod章节介绍。
static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd;
(?!pattern) 的形式叫做 零宽负向先行断言(zero-width negative lookahead assertion)。
表示某位置的字符串不能满足pattern。比如 admin.((?!($ | system.))),admin.$和admin.system.都不符合要求。 |