mongod源码分析WiredTigere存储引擎StorageEngin实例化流程
当mongod 启动时,初始化存储引擎在src/mongo/db/storage/storage_engine_init.cpp中的 initializeStorageEngine() 函数里。
mongodb启动之后,数据data文件夹有几个重要文件:mongod.lock,storage.bson,WiredTiger.lock,WiredTiger,WiredTiger.wt,sizeStorer,journal等
mongod.lock 文件是 MongoDB 用于防止同一数据目录被多个 mongod 实例同时访问的锁文件;mongod.lock写入的是当前线程id;
故障恢复:若服务异常终止,残留的锁文件会阻止新实例启动,需手动清理;
主要实现在 src/mongo/db/storage/storage_engine_lock_file.cpp 中StorageEngineLockFile 类负责处理锁文件的创建和管理
storage.bson 是 MongoDB 存储引擎的元数据持久化文件,主要记录:存储引擎的初始化状态(如 WiredTiger 或 MMAPv1);数据文件的版本信息;恢复点(Recovery Point),用于崩溃后的一致性恢复;标识符和配置参数(如引擎类型、压缩设置等)
initializeStorageEngine()主要有:1创建和效验mongod.lock; 2 判断data path路径是否存在,创建和效验storage.bson; 3 获取引擎工厂 4创建引擎WiredTiger 5 结束写入线程id到文件mongod.lock
mongod源码分析WiredTigere存储引擎StorageEngin实例化代码调用链如下:
- mongo/db/db.cpp的main
- mongo/db/db.cpp的mongoDbMain
- mongo/db/db.cpp的initAndListen
- mongo/db/db.cpp的_initAndListen
- mongo/db/db.cpp的initializeStorageEngine
- mongo/db/storage/storage_engine_init.cpp的createLockFile
- mongo/db/storage/storage_engine_lock_file.h的创建构造函数
- mongo/db/storage/storage_engine_lock_file_windows.cpp的open
- mongo/db/storage/storage_engine_metadata.cpp的getStorageEngineForPath
- mongo/db/storage/storage_engine_metadata.cpp的forPath
- mongo/db/db.cpp的getFactoryForStorageEngine
- mongo/db/storage/storage_engine.h的create
- mongo/db/storage/wiredtiger_init.cpp的create
- mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp中WiredTigerKVEngine构造函数
- mongo/db/storage/storage_engine_lock_file.cpp的writePid
源代码调用链流程图如下:

mongo/db/db.cpp中main-> mongoDbMain -> initAndListen -> _initAndListen;_initAndListen初始化个环境变量,这篇文章主要讲存储引擎实例化流程
#if defined(_WIN32)
int wmain(int argc, wchar_t* argvW[], wchar_t* envpW[]) {
mongo::WindowsCommandLine wcl(argc, argvW, envpW);
std::cout << "conca " << " this is win32..." ;
int exitCode = mongo::mongoDbMain(argc, wcl.argv(), wcl.envp());
mongo::quickExit(exitCode);
}
#else
int main(int argc, char* argv[], char** envp) {
int exitCode = mongo::mongoDbMain(argc, argv, envp);
mongo::quickExit(exitCode);
}
#endif
int mongoDbMain(int argc, char* argv[], char** envp) {
...
ExitCode exitCode = initAndListen(serverGlobalParams.port);
return 0;
}
ExitCode initAndListen(int listenPort) {
try {
return _initAndListen(listenPort);
} catch (DBException& e) {
log() << "exception in initAndListen: " << e.toString() << ", terminating";
return EXIT_UNCAUGHT;
} catch (std::exception& e) {
log() << "exception in initAndListen std::exception: " << e.what() << ", terminating";
return EXIT_UNCAUGHT;
} catch (int& n) {
log() << "exception in initAndListen int: " << n << ", terminating";
return EXIT_UNCAUGHT;
} catch (...) {
log() << "exception in initAndListen, terminating";
return EXIT_UNCAUGHT;
}
}
ExitCode _initAndListen(int listenPort) {
Client::initThread("initandlisten");
...
initializeStorageEngine(serviceContext, StorageEngineInitFlags::kNone);
}
storage_engine_init.cpp中initializeStorageEngine是最核心的方法,代码快主要有:
1创建和效验mongod.lock; createLockFile(service);
2 判断data path路径是否存在,创建和效验storage.bson; if (auto existingStorageEngine = StorageEngineMetadata::getStorageEngineForPath(dbpath))
3 获取引擎工厂; StorageEngine::Factory* factory = getFactoryForStorageEngine(service, storageGlobalParams.engine);
4创建引擎WiredTiger;factory->create(storageGlobalParams, lockFile ? &*lockFile : nullptr)
5 结束写入线程id到文件mongod.lock; lockFile->writePid();
void initializeStorageEngine(ServiceContext* service, const StorageEngineInitFlags initFlags) {
// This should be set once.
invariant(!service->getStorageEngine());
if (0 == (initFlags & StorageEngineInitFlags::kAllowNoLockFile)) {
createLockFile(service);
}
const std::string dbpath = storageGlobalParams.dbpath;
...
if (auto existingStorageEngine = StorageEngineMetadata::getStorageEngineForPath(dbpath)) {
if (storageGlobalParams.engineSetByUser) {
// Verify that the name of the user-supplied storage engine matches the contents of
// the metadata file.
const StorageEngine::Factory* factory =
getFactoryForStorageEngine(service, storageGlobalParams.engine);
if (factory) {
uassert(28662,
str::stream()
<< "Cannot start server. Detected data files in " << dbpath
<< " created by"
<< " the '" << *existingStorageEngine << "' storage engine, but the"
<< " specified storage engine was '" << factory->getCanonicalName()
<< "'.",
factory->getCanonicalName() == *existingStorageEngine);
}
} else {
// Otherwise set the active storage engine as the contents of the metadata file.
log() << "Detected data files in " << dbpath << " created by the '"
<< *existingStorageEngine << "' storage engine, so setting the active"
<< " storage engine to '" << *existingStorageEngine << "'.";
storageGlobalParams.engine = *existingStorageEngine;
}
}
const StorageEngine::Factory* factory =
getFactoryForStorageEngine(service, storageGlobalParams.engine);
uassert(18656,
str::stream() << "Cannot start server with an unknown storage engine: "
<< storageGlobalParams.engine,
factory);
if (storageGlobalParams.readOnly) {
uassert(34368,
str::stream()
<< "Server was started in read-only mode, but the configured storage engine, "
<< storageGlobalParams.engine << ", does not support read-only operation",
factory->supportsReadOnly());
}
std::unique_ptr<StorageEngineMetadata> metadata = StorageEngineMetadata::forPath(dbpath);
if (storageGlobalParams.readOnly) {
uassert(34415,
"Server was started in read-only mode, but the storage metadata file was not"
" found.",
metadata.get());
}
// Validate options in metadata against current startup options.
if (metadata.get()) {
uassertStatusOK(factory->validateMetadata(*metadata, storageGlobalParams));
}
auto guard = makeGuard([&] {
auto& lockFile = StorageEngineLockFile::get(service);
if (lockFile) {
lockFile->close();
}
});
auto& lockFile = StorageEngineLockFile::get(service);
service->setStorageEngine(std::unique_ptr<StorageEngine>(
factory->create(storageGlobalParams, lockFile ? &*lockFile : nullptr)));
service->getStorageEngine()->finishInit();
if (lockFile) {
uassertStatusOK(lockFile->writePid());
}
....
}
代码块1 创建和效验mongod.lock
mongo/db/storage/storage_engine_init.cpp中createLockFile关键函数,主要用于确保单个 MongoDB 实例独占访问其数据目录。这是 MongoDB 数据完整性和防止多个实例同时访问同一数据集的重要机制。 在锁文件mongod.lock中写入当前 mongod 进程的PID,用于后续检查进程是否仍然存活。在数据目录初始化早期创建锁文件,确保后续操作在独占访问条件下进行。
void createLockFile(ServiceContext* service) {
LOG(1) << "conca createLockFile";
auto& lockFile = StorageEngineLockFile::get(service);
try {
lockFile.emplace(storageGlobalParams.dbpath);
} catch (const std::exception& ex) {
uassert(28596,
str::stream() << "Unable to determine status of lock file in the data directory "
<< storageGlobalParams.dbpath << ": " << ex.what(),
false);
}
const bool wasUnclean = lockFile->createdByUncleanShutdown();
const auto openStatus = lockFile->open();
if (storageGlobalParams.readOnly && openStatus == ErrorCodes::IllegalOperation) {
lockFile = boost::none;
} else {
uassertStatusOK(openStatus);
}
if (wasUnclean) {
if (storageGlobalParams.readOnly) {
severe() << "Attempted to open dbpath in readOnly mode, but the server was "
"previously not shut down cleanly.";
fassertFailedNoTrace(34416);
}
warning() << "Detected unclean shutdown - " << lockFile->getFilespec() << " is not empty.";
startingAfterUncleanShutdown(service) = true;
}
}
mongo/db/storage/storage_engine_lock_file.h中定义StorageEngineLockFile,mongod.lock变量就在这个文件中定义的。lockFile.emplace 是 C++ 的 std::optional 或类似包装类型的方法,用于在 StorageEngineLockFile 对象中初始化。
constexpr StringData kLockFileBasename = "mongod.lock"_sd;
class StorageEngineLockFile {
StorageEngineLockFile(const StorageEngineLockFile&) = delete;
StorageEngineLockFile& operator=(const StorageEngineLockFile&) = delete;
public:
static boost::optional<StorageEngineLockFile>& get(ServiceContext* service);
/**
* Checks existing lock file, if present, to see if it contains data from a previous
* unclean shutdown. A clean shutdown should have produced a zero length lock file.
* Uses open() to read existing lock file or create new file.
* Uses boost::filesystem to check lock file so may throw boost::exception.
*/
StorageEngineLockFile(const std::string& dbpath, StringData fileName = kLockFileBasename);
virtual ~StorageEngineLockFile();
mongo/db/storage/storage_engine_lock_file_windows.cpp以独占模式打开文件mongod.lock。
Status StorageEngineLockFile::open() {
try {
if (!boost::filesystem::exists(_dbpath)) {
return Status(ErrorCodes::NonExistentPath,
str::stream() << "Data directory " << _dbpath << " not found.");
}
} catch (const std::exception& ex) {
return Status(ErrorCodes::UnknownError,
str::stream() << "Unable to check existence of data directory " << _dbpath
<< ": " << ex.what());
}
HANDLE lockFileHandle = CreateFileW(toNativeString(_filespec.c_str()).c_str(),
GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ /* only allow readers access */,
NULL,
OPEN_ALWAYS /* success if fh can open */,
0,
NULL);
if (lockFileHandle == INVALID_HANDLE_VALUE) {
int errorcode = GetLastError();
if (errorcode == ERROR_ACCESS_DENIED) {
return Status(ErrorCodes::IllegalOperation,
str::stream()
<< "Attempted to create a lock file on a read-only directory: "
<< _dbpath);
}
return Status(ErrorCodes::DBPathInUse,
str::stream() << "Unable to create/open the lock file: " << _filespec << " ("
<< errnoWithDescription(errorcode) << ")."
<< " Ensure the user executing mongod is the owner of the lock "
"file and has the appropriate permissions. Also make sure "
"that another mongod instance is not already running on the "
<< _dbpath << " directory");
}
_lockFileHandle->_handle = lockFileHandle;
return Status::OK();
}
代码块2 判断data path路径是否存在
MongoDB 支持多种存储引擎(如 WiredTiger、MMAPv1 等),每个存储引擎可能有不同的数据格式和管理方式。为了确保 MongoDB实例能够正确加载与数据目录匹配的存储引擎,MongoDB 使用了一个元信息文件storage.bson来记录当前数据目录的存储引擎类型。
mongo/db/storage/storage_engine_metadata.cpp中StorageEngineMetadata::getStorageEngineForPath(dbpath)代码
boost::optional<std::string> StorageEngineMetadata::getStorageEngineForPath(
const std::string& dbpath) {
if (auto metadata = StorageEngineMetadata::forPath(dbpath)) {
return {metadata->getStorageEngine()};
}
return {};
}
const std::string kMetadataBasename = "storage.bson";
// static
std::unique_ptr<StorageEngineMetadata> StorageEngineMetadata::forPath(const std::string& dbpath) {
std::unique_ptr<StorageEngineMetadata> metadata;
if (boost::filesystem::exists(boost::filesystem::path(dbpath) / kMetadataBasename)) {
metadata.reset(new StorageEngineMetadata(dbpath));
Status status = metadata->read();
if (!status.isOK()) {
error() << "Unable to read the storage engine metadata file: " << status;
fassertFailedNoTrace(28661);
}
}
return metadata;
}
代码块3 获取引擎工厂
getFactoryForStorageEngine 是 MongoDB 存储引擎子系统中的一个关键函数,用于根据存储引擎名称获取对应的工厂对象。这个函数在存储引擎初始化和切换过程中扮演重要角色。StringData name:存储引擎的名称(如 "wiredTiger")用于查找对应的工厂对象
StorageEngine::Factory* getFactoryForStorageEngine(ServiceContext* service, StringData name) {
const auto result = storageFactories(service).find(name.toString());
if (result == storageFactories(service).end()) {
return nullptr;
}
return result->second.get();
}
storageFactories(service):这是一个函数调用,通过传入的 service 服务上下文对象获取存储引擎工厂的集合。auto storageFactories = ServiceContext::declareDecoration<FactoryMap>();
FactoryMap集合根据名字wiredTiger从中find。
代码块4 创建引擎WiredTiger
mongo/db/storage/wiredtiger_init.cpp的create方法主要功能:(1)WiredTigerUtil::getCacheSizeMB计算引擎内存大小;(2)创建WiredTigerKVEngine。
virtual StorageEngine* create(const StorageGlobalParams& params,
const StorageEngineLockFile* lockFile) const {
if (lockFile && lockFile->createdByUncleanShutdown()) {
warning() << "Recovering data from the last clean checkpoint.";
}
#if defined(__linux__)
// This is from <linux/magic.h> but that isn't available on all systems.
// Note that the magic number for ext4 is the same as ext2 and ext3.
#define EXT4_SUPER_MAGIC 0xEF53
{
struct statfs fs_stats;
int ret = statfs(params.dbpath.c_str(), &fs_stats);
if (ret == 0 && fs_stats.f_type == EXT4_SUPER_MAGIC) {
log() << startupWarningsLog;
log() << "** WARNING: Using the XFS filesystem is strongly recommended with the "
"WiredTiger storage engine"
<< startupWarningsLog;
log() << "** See "
"http://dochub.mongodb.org/core/prodnotes-filesystem"
<< startupWarningsLog;
}
}
#endif
size_t cacheMB = WiredTigerUtil::getCacheSizeMB(wiredTigerGlobalOptions.cacheSizeGB);
log() << "conca cacheMB is " << cacheMB;
const double memoryThresholdPercentage = 0.8;
ProcessInfo p;
if (p.supported()) {
if (cacheMB > memoryThresholdPercentage * p.getMemSizeMB()) {
log() << startupWarningsLog;
log() << "** WARNING: The configured WiredTiger cache size is more than "
<< memoryThresholdPercentage * 100 << "% of available RAM."
<< startupWarningsLog;
log() << "** See "
"http://dochub.mongodb.org/core/faq-memory-diagnostics-wt"
<< startupWarningsLog;
}
}
const bool ephemeral = false;
const auto maxCacheOverflowMB =
static_cast<size_t>(1024 * wiredTigerGlobalOptions.maxCacheOverflowFileSizeGB);
WiredTigerKVEngine* kv =
new WiredTigerKVEngine(getCanonicalName().toString(),
params.dbpath,
getGlobalServiceContext()->getFastClockSource(),
wiredTigerGlobalOptions.engineConfig,
cacheMB,
maxCacheOverflowMB,
params.dur,
ephemeral,
params.repair,
params.readOnly);
kv->setRecordStoreExtraOptions(wiredTigerGlobalOptions.collectionConfig);
kv->setSortedDataInterfaceExtraOptions(wiredTigerGlobalOptions.indexConfig);
...
StorageEngineOptions options;
options.directoryPerDB = params.directoryperdb;
options.directoryForIndexes = wiredTigerGlobalOptions.directoryForIndexes;
options.forRepair = params.repair;
return new StorageEngineImpl(kv, options);
}
mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp中WiredTigerKVEngine构造函数完成了 WiredTiger 存储引擎的初始化工作,包括日志目录管理、配置选项设置、连接打开、时间戳处理、会话管理等多个方面,确保存储引擎能够正常工作。
日志目录处理:构建 journal 目录路径,若启用持久化,就检查日志目录journal 是否存在,若不存在则创建该目录,若创建失败则记录错误并抛出异常。
WiredTiger 配置构建:设置缓存大小、会话数。
_openWiredTiger 实际打开 WiredTiger 数据库。
初始化会话缓存和会话清理器WiredTigerSessionCache
WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
const std::string& path,
ClockSource* cs,
const std::string& extraOpenOptions,
size_t cacheSizeMB,
size_t maxCacheOverflowFileSizeMB,
bool durable,
bool ephemeral,
bool repair,
bool readOnly)
: _clockSource(cs),
_oplogManager(std::make_unique<WiredTigerOplogManager>()),
_canonicalName(canonicalName),
_path(path),
_sizeStorerSyncTracker(cs, 100000, Seconds(60)),
_durable(durable),
_ephemeral(ephemeral),
_inRepairMode(repair),
_readOnly(readOnly),
_keepDataHistory(serverGlobalParams.enableMajorityReadConcern) {
boost::filesystem::path journalPath = path;
journalPath /= "journal";
if (_durable) {
if (!boost::filesystem::exists(journalPath)) {
try {
boost::filesystem::create_directory(journalPath);
} catch (std::exception& e) {
log() << "error creating journal dir " << journalPath.string() << ' ' << e.what();
throw;
}
}
}
_previousCheckedDropsQueued = _clockSource->now();
std::stringstream ss;
ss << "create,";
ss << "cache_size=" << cacheSizeMB << "M,";
ss << "cache_overflow=(file_max=" << maxCacheOverflowFileSizeMB << "M),";
ss << "session_max=33000,";
ss << "eviction=(threads_min=4,threads_max=4),";
ss << "config_base=false,";
ss << "statistics=(fast),";
if (!WiredTigerSessionCache::isEngineCachingCursors()) {
ss << "cache_cursors=false,";
}
// The setting may have a later setting override it if not using the journal. We make it
// unconditional here because even nojournal may need this setting if it is a transition
// from using the journal.
if (!_readOnly) {
// If we're readOnly skip all WAL-related settings.
ss << "log=(enabled=true,archive=true,path=journal,compressor=";
ss << wiredTigerGlobalOptions.journalCompressor << "),";
ss << "file_manager=(close_idle_time=" << gWiredTigerFileHandleCloseIdleTime
<< ",close_scan_interval=" << gWiredTigerFileHandleCloseScanInterval
<< ",close_handle_minimum=" << gWiredTigerFileHandleCloseMinimum << "),";
ss << "statistics_log=(wait=" << wiredTigerGlobalOptions.statisticsLogDelaySecs << "),";
if (shouldLog(::mongo::logger::LogComponent::kStorageRecovery,
logger::LogSeverity::Debug(3))) {
ss << "verbose=[recovery_progress,checkpoint_progress,compact_progress,recovery],";
} else {
ss << "verbose=[recovery_progress,checkpoint_progress,compact_progress],";
}
if (kDebugBuild) {
// Enable debug write-ahead logging for all tables under debug build.
ss << "debug_mode=(table_logging=true,";
// For select debug builds, support enabling WiredTiger eviction debug mode. This uses
// more aggressive eviction tactics, but may have a negative performance impact.
if (gWiredTigerEvictionDebugMode) {
ss << "eviction=true,";
}
ss << "),";
}
}
ss << WiredTigerCustomizationHooks::get(getGlobalServiceContext())
->getTableCreateConfig("system");
ss << WiredTigerExtensions::get(getGlobalServiceContext())->getOpenExtensionsConfig();
ss << extraOpenOptions;
if (_readOnly) {
invariant(!_durable);
ss << ",readonly=true,";
}
if (!_durable && !_readOnly) {
// If we started without the journal, but previously used the journal then open with the
// WT log enabled to perform any unclean shutdown recovery and then close and reopen in
// the normal path without the journal.
if (boost::filesystem::exists(journalPath)) {
string config = ss.str();
log() << "Detected WT journal files. Running recovery from last checkpoint.";
log() << "journal to nojournal transition config: " << config;
int ret = wiredtiger_open(
path.c_str(), _eventHandler.getWtEventHandler(), config.c_str(), &_conn);
if (ret == EINVAL) {
fassertFailedNoTrace(28717);
} else if (ret != 0) {
Status s(wtRCToStatus(ret));
msgasserted(28718, s.reason());
}
invariantWTOK(_conn->close(_conn, nullptr));
// After successful recovery, remove the journal directory.
try {
boost::filesystem::remove_all(journalPath);
} catch (std::exception& e) {
error() << "error removing journal dir " << journalPath.string() << ' ' << e.what();
throw;
}
}
// This setting overrides the earlier setting because it is later in the config string.
ss << ",log=(enabled=false),";
}
string config = ss.str();
log() << "wiredtiger_open config: " << config;
_openWiredTiger(path, config);
_eventHandler.setStartupSuccessful();
_wtOpenConfig = config;
{
char buf[(2 * 8 /*bytes in hex*/) + 1 /*nul terminator*/];
invariantWTOK(_conn->query_timestamp(_conn, buf, "get=recovery"));
std::uint64_t tmp;
fassert(50758, NumberParser().base(16)(buf, &tmp));
_recoveryTimestamp = Timestamp(tmp);
LOG_FOR_RECOVERY(0) << "WiredTiger recoveryTimestamp. Ts: " << _recoveryTimestamp;
}
_sessionCache.reset(new WiredTigerSessionCache(this));
_sessionSweeper = std::make_unique<WiredTigerSessionSweeper>(_sessionCache.get());
_sessionSweeper->go();
// Until the Replication layer installs a real callback, prevent truncating the oplog.
setOldestActiveTransactionTimestampCallback(
[](Timestamp) { return StatusWith(boost::make_optional(Timestamp::min())); });
if (!_readOnly && !_ephemeral) {
if (!_recoveryTimestamp.isNull()) {
setInitialDataTimestamp(_recoveryTimestamp);
setOldestTimestamp(_recoveryTimestamp, false);
setStableTimestamp(_recoveryTimestamp, false);
}
}
if (_ephemeral && !getTestCommandsEnabled()) {
// We do not maintain any snapshot history for the ephemeral storage engine in production
// because replication and sharded transactions do not currently run on the inMemory engine.
// It is live in testing, however.
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.store(0);
}
_sizeStorerUri = _uri("sizeStorer");
WiredTigerSession session(_conn);
if (!_readOnly && repair && _hasUri(session.getSession(), _sizeStorerUri)) {
log() << "Repairing size cache";
auto status = _salvageIfNeeded(_sizeStorerUri.c_str());
if (status.code() != ErrorCodes::DataModifiedByRepair)
fassertNoTrace(28577, status);
}
_sizeStorer = std::make_unique<WiredTigerSizeStorer>(_conn, _sizeStorerUri, _readOnly);
Locker::setGlobalThrottling(&openReadTransaction, &openWriteTransaction);
}
代码块5 结束写入线程id到文件mongod.lock;lockFile->writePid();
mongo/db/storage/storage_engine_lock_file.cpp的writePid代码如下:
Status StorageEngineLockFile::writePid() {
ProcessId pid = ProcessId::getCurrent();
std::stringstream ss;
ss << pid << std::endl;
std::string pidStr = ss.str();
return writeString(pidStr);
}
更多推荐

所有评论(0)