在本系列的第一篇文章(主函数入口)中,介绍了mongodb会在系统启动同时,初始化了日志持久化服务,该功能貌似是1.7版本后引入到系统中的,主要用于解决因系统宕机时,内存中的数据未写入磁盘而造成的数据丢失。其机制主要是通过log方式定时将操作日志(如cud操作等)记录到db的journal文件夹下,这样当系统再次重启时从该文件夹下恢复丢失的(内存)数据。也就是在_initAndListen()函数体(db.cpp文件第511行)中下面这一行代码:
dur::startup();
在Mongodb中,提供持久化的类一般都以dur开头,比如下面几个:
dur.cpp:封装持久化主要方法和实现,以便外部使用
dur_commitjob.cpp:持久化任务工作(单元),封装延时队列TaskQueue<D> ,操作集合vector<shared_ptr<DurOp>>等 dur_journal.cpp:提供日志文件/路径,创建,遍历等操作 dur_journalformat.h:日志文件格式定义 dur_preplogbuffer.cpp:构造用于输出的日志buffer dur_recover.h:日志恢复类(后台任务方式BackgroupJob) dur_stats.h:统计类,包括提交/同步数据次数等 dur_writetodatafiles.cpp:封装写入数据文件mongofile方法 durop.h:持久化操作类,提供序列化,创建操作(FileCreatedOp),DROP操作(DropDbOp)
首先我们看一下dur::startup()方法实现(dur.cpp),如下:
/** at startup, recover, and then start the journal threads */
void startup() { if( !cmdLine.dur ) /*判断命令行启动参数是否为持久化*/ return; DurableInterface::enableDurability();//对持久化变量 _impl 设置为DurableImpl方式 journalMakeDir();/*构造日志文件所要存储的路径:dur_journal.cpp*/ try { recover(); /*从上一次系统crash中恢复数据日志信息:dur_recover.cpp*/ } catch(...) { log() << "exception during recovery" << endl; throw; } preallocateFiles(); boost::thread t(durThread); }
class DurableInterface : boost::noncopyable {
virtual void* writingPtr(void *x, unsigned len) = 0; virtual void createdFile(string filename, unsigned long long len) = 0; virtual void declareWriteIntent(void *x, unsigned len) = 0; virtual void* writingAtOffset(void *buf, unsigned ofs, unsigned len) = 0; .... }
class NonDurableImpl : public DurableInterface{ /*非持久化,基于内存临时存储*/
} class DurableImpl : public DurableInterface { /*持久化,支持磁盘存储*/ }
void durThread() {
Client::initThread("dur"); const int HowOftenToGroupCommitMs = 90;/*多少时间提交一组信息,单位:毫秒*/ //注:commitJob对象用于封装并执行提交一组操作 while( !inShutdown() ) { sleepmillis(10); CodeBlock::Within w(durThreadMain);/*定义代码块锁,该设计很讨巧,接下来会介绍*/ try { int millis = HowOftenToGroupCommitMs; { stats.rotate();//统计最新的_lastRotate信息 { Timer t;/*声明定时器*/ /*遍历日志文件夹下的文件并更新文件的“最新更新时间”标志位并移除无效或关闭之前使用的日志文件:dur_journal.cpp*/ journalRotate(); millis -= t.millis();/*线程睡眠时间为90减去遍历时间*/ assert( millis <= HowOftenToGroupCommitMs ); if( millis < 5 ) millis = 5; } // we do this in a couple blocks, which makes it a tiny bit faster (only a little) on throughput, // but is likely also less spiky on our cpu usage, which is good: sleepmillis(millis/2); //从commitJob的defer任务队列中获取任务并执行,详情参见: taskqueue.h的invoke() 和 dur_commitjob.cpp 的 // Writes::D::go(const Writes::D& d)方法(用于非延迟写入信息操作) commitJob.wi()._deferred.invoke(); sleepmillis(millis/2); //按mongodb开发者的理解,通过将休眠时间减少一半(millis/2)并紧跟着继续从队列中取任务, //以此小幅提升读取队列系统的吞吐量 commitJob.wi()._deferred.invoke(); } go(); //执行提交一组信息操作 } catch(std::exception& e) {/*服务如果突然crash*/ log() << "exception in durThread causing immediate shutdown: " << e.what() << endl; abort(); // based on myTerminate() } } cc().shutdown();//关闭当前线程,Client::initThread("dur") }
static void go() {
if( !commitJob.hasWritten() ){ /*hasWritten一般在CUD操作时会变为true,后面会加以介绍*/ commitJob.notifyCommitted();/*发送信息已存储到磁盘的通知*/ return; } { readlocktry lk("", 1000);/*声明读锁*/ if( lk.got() ) { groupCommit();/*提交一组操作*/ return; } } // 当未取到读锁时,可能获取读锁比较慢,则直接使用写锁,不过写锁会用更多的RAM writelock lk; groupCommit(); }
/** locking: in read lock when called. */
static void _groupCommit() { stats.curr->_commits++;/*提交次数加1*/ ...... //预定义页对齐的日志缓存对象,该对象对会commitJob.ops()的返回值(该返回值类型vector< shared_ptr<DurOp> >)进行对象序列化 //并保存到commitJob._ab中,供下面方法调用,位于dur_preplogbuffer.cpp-->_PREPLOGBUFFER()方法 PREPLOGBUFFER(); // todo : write to the journal outside locks, as this write can be slow. // however, be careful then about remapprivateview as that cannot be done // if new writes are then pending in the private maps. WRITETOJOURNAL(commitJob._ab);/*写入journal信息,最终操作位于dur_journal.cpp的 Journal::journal(const AlignedBuilder& b)方法*/ // data is now in the journal, which is sufficient for acknowledging getLastError. // (ok to crash after that) commitJob.notifyCommitted(); WRITETODATAFILES();/*写信息到mongofile文件中*/ commitJob.reset();/*重置当前任务操作*/ // REMAPPRIVATEVIEW // remapping 私有视图必须在 WRITETODATAFILES 方法之后调用,否则无法读出新写入的数据 DEV assert( !commitJob.hasWritten() ); if( !dbMutex.isWriteLocked() ) { // this needs done in a write lock (as there is a short window during remapping when each view // might not exist) thus we do it on the next acquisition of that instead of here (there is no // rush if you aren't writing anyway -- but it must happen, if it is done, before any uncommitted // writes occur). If desired, perhpas this can be eliminated on posix as it may be that the remap // is race-free there. // dbMutex._remapPrivateViewRequested = true; } else { stats.curr->_commitsInWriteLock++; // however, if we are already write locked, we must do it now -- up the call tree someone // may do a write without a new lock acquisition. this can happen when MongoMMF::close() calls // this method when a file (and its views) is about to go away. // REMAPPRIVATEVIEW(); } }
DiskLoc DataFileMgr::insert(const char *ns, const void *obuf, int len, bool god, const BSONElement &writeId, bool mayAddIndex) {
...... r = (Record*) getDur().writingPtr(r, lenWHdr);//位于1588行
void* DurableImpl::writingPtr(void *x, unsigned len) {
void *p = x; declareWriteIntent(p, len); return p; } void DurableImpl::declareWriteIntent(void *p, unsigned len) { commitJob.note(p, len); } void CommitJob::note(void* p, int len) { DEV dbMutex.assertWriteLocked(); dassert( cmdLine.dur ); if( !_wi._alreadyNoted.checkAndSet(p, len) ) { MemoryMappedFile::makeWritable(p, len);/*设置可写入mmap文件的信息*/ if( !_hasWritten ) { assert( !dbMutex._remapPrivateViewRequested ); // 设置写信息标志位, 用于进行_groupCommit(上面提到)时进行判断 _hasWritten = true; } ...... // 向defer任务队列中加入操作信息 _wi.insertWriteIntent(p, len); wassert( _wi._writes.size() < 2000000 ); assert( _wi._writes.size() < 20000000 ); ...... }
void insertWriteIntent(void* p, int len) {
D d; d.p = p;/*操作记录record类型*/ d.len = len;/*记录长度*/ _deferred.defer(d);/*延期任务队列:TaskQueue<D>类型*/ }
CodeBlock::Within w(durThreadMain);
class CodeBlock {
volatile int n; unsigned tid; void fail() { log() << "synchronization (race condition) failure" << endl; printStackTrace(); abort();/**/ } void enter() { if( ++n != 1 ) fail(); /*当已有线程执行该代码块时,则执行fail*/ #if defined(_WIN32) tid = GetCurrentThreadId(); #endif } void leave() { /*只有调用 leave 操作,才会--n,即在线程执行完该代码块时调用*/ if( --n != 0 ) fail(); } public: CodeBlock() : n(0) { } class Within { CodeBlock& _s; public: Within(CodeBlock& s) : _s(s) { _s.enter(); } ~Within() { _s.leave(); } }; void assertWithin() { assert( n == 1 ); #if defined(_WIN32) assert( GetCurrentThreadId() == tid ); #endif } }; #else
参考链接:http://www.infoq.com/cn/news/2011/03/MongoDB-1.8
原文链接:http://www.cnblogs.com/daizhj/archive/2011/03/21/1990344.html
作者: daizhj, 代震军 微博: http://t.sina.com.cn/daizhj Tags: mongodb,c++,source code (责任编辑:IT) |