gdb ./test/parallel_buffer_pool_manager_test b parallel_buffer_pool_manager_test.cpp:69 r #69 EXPECT_EQ(true, bpm->UnpinPage(i, true)); p bpm->instances_ [0]->replacer_ ->Size() #0 n #70 bpm->FlushPage(i); p bpm->instances_ [0]->replacer_ ->Size() #1 n #68 for (int i = 0; i < 5; ++i) { p bpm->instances_ [0]->replacer_ ->Size() #0
// test_build显示的报错信息 #RoundRobinNewPage Expected equality of these values: unpin_page + num_instances Which is: 11 page_id_temp Which is: 13 #test_memory_safety Timeout Happened during valgrind #ParallelBufferPoolManager_HardTestD 这个好像就是我调试时用的printf语句没删去,执行太慢所以报错了
/** * LRUReplacer implements the Least Recently Used replacement policy. */ classLRUReplacer : public Replacer { public: /** * Create a new LRUReplacer. * @param num_pages the maximum number of pages the LRUReplacer will be required to store */ explicitLRUReplacer(size_t num_pages);
/** * Destroys the LRUReplacer. */ ~LRUReplacer() override;
/** * BufferPoolManager reads disk pages to and from its internal buffer pool. */ classBufferPoolManagerInstance : public BufferPoolManager { public: /** * Creates a new BufferPoolManagerInstance. * @param pool_size the size of the buffer pool * @param disk_manager the disk manager * @param log_manager the log manager (for testing only: nullptr = disable logging) */ BufferPoolManagerInstance(size_t pool_size, DiskManager *disk_manager, LogManager *log_manager = nullptr); /** * Creates a new BufferPoolManagerInstance. * @param pool_size the size of the buffer pool * @param num_instances total number of BPIs in parallel BPM * @param instance_index index of this BPI in the parallel BPM * @param disk_manager the disk manager * @param log_manager the log manager (for testing only: nullptr = disable logging) */ BufferPoolManagerInstance(size_t pool_size, uint32_t num_instances, uint32_t instance_index, DiskManager *disk_manager, LogManager *log_manager = nullptr);
/** * Destroys an existing BufferPoolManagerInstance. */ ~BufferPoolManagerInstance() override;
/** @return size of the buffer pool */ size_tGetPoolSize()override{ return pool_size_; }
/** @return pointer to all the pages in the buffer pool */ Page *GetPages(){ return pages_; }
protected: /** * Fetch the requested page from the buffer pool. * @param page_id id of page to be fetched * @return the requested page */ Page *FetchPgImp(page_id_t page_id)override;
/** * Unpin the target page from the buffer pool. * @param page_id id of page to be unpinned * @param is_dirty true if the page should be marked as dirty, false otherwise * @return false if the page pin count is <= 0 before this call, true otherwise */ boolUnpinPgImp(page_id_t page_id, bool is_dirty)override;
/** * Flushes the target page to disk. * @param page_id id of page to be flushed, cannot be INVALID_PAGE_ID * @return false if the page could not be found in the page table, true otherwise */ boolFlushPgImp(page_id_t page_id)override;
/** * Creates a new page in the buffer pool. * @param[out] page_id id of created page * @return nullptr if no new pages could be created, otherwise pointer to new page */ Page *NewPgImp(page_id_t *page_id)override;
/** * Deletes a page from the buffer pool. * @param page_id id of page to be deleted * @return false if the page exists but could not be deleted, true if the page didn't exist or deletion succeeded */ boolDeletePgImp(page_id_t page_id)override;
/** * Flushes all the pages in the buffer pool to disk. */ voidFlushAllPgsImp()override;
/** * Allocate a page on disk.∂ * @return the id of the allocated page */ page_id_tAllocatePage();
/** * Deallocate a page on disk. * @param page_id id of the page to deallocate */ voidDeallocatePage(__attribute__((unused)) page_id_t page_id){ // This is a no-nop right now without a more complex data structure to track deallocated pages }
/** * Validate that the page_id being used is accessible to this BPI. This can be used in all of the functions to * validate input data and ensure that a parallel BPM is routing requests to the correct BPI * @param page_id */ voidValidatePageId(page_id_t page_id)const;
/** Number of pages in the buffer pool. */ constsize_t pool_size_; /** How many instances are in the parallel BPM (if present, otherwise just 1 BPI) */ constuint32_t num_instances_ = 1; /** Index of this BPI in the parallel BPM (if present, otherwise just 0) */ constuint32_t instance_index_ = 0; /** Each BPI maintains its own counter for page_ids to hand out, must ensure they mod back to its instance_index_ */ std::atomic<page_id_t> next_page_id_ = instance_index_;
/** Array of buffer pool pages. */ Page *pages_; /** Pointer to the disk manager. */ DiskManager *disk_manager_ __attribute__((__unused__)); /** Pointer to the log manager. */ LogManager *log_manager_ __attribute__((__unused__)); /** Page table for keepi__attribute__ng track of buffer pool pages. */ std::unordered_map<page_id_t, frame_id_t> page_table_; /** Replacer to find unpinned pages for replacement. */ Replacer *replacer_; /** List of free pages. */ std::list<frame_id_t> free_list_; /** This latch protects shared data structures. We recommend updating this comment to describe what it protects. */ std::mutex latch_; }; } // namespace bustub
BufferPoolManagerInstance::BufferPoolManagerInstance(size_t pool_size, uint32_t num_instances, uint32_t instance_index, DiskManager *disk_manager, LogManager *log_manager) : pool_size_(pool_size), num_instances_(num_instances), instance_index_(instance_index), next_page_id_(instance_index), disk_manager_(disk_manager), log_manager_(log_manager) { BUSTUB_ASSERT(num_instances > 0, "If BPI is not part of a pool, then the pool size should just be 1"); BUSTUB_ASSERT( instance_index < num_instances, "BPI index cannot be greater than the number of BPIs in the pool. In non-parallel case, index should just be 1."); // We allocate a consecutive memory space for the buffer pool. pages_ = new Page[pool_size_]; replacer_ = newLRUReplacer(pool_size);
// Initially, every page is in the free list. for (size_t i = 0; i < pool_size_; ++i) { free_list_.emplace_back(static_cast<int>(i)); } }
voidBufferPoolManagerInstance::FlushAllPgsImp(){ // You can do it! std::lock_guard<std::mutex> lock(latch_); // 加锁 auto iter = page_table_.begin(); while (iter != page_table_.end()) { page_id_t page_id = iter->first; frame_id_t frame_id = page_table_[page_id]; if (pages_[frame_id].IsDirty()) { disk_manager_->WritePage(page_id, pages_[frame_id].data_); pages_[frame_id].is_dirty_ = false; } } }
Page *BufferPoolManagerInstance::NewPgImp(page_id_t *page_id){ // 0. Make sure you call AllocatePage! // 1. If all the pages in the buffer pool are pinned, return nullptr. // 2. Pick a victim page P from either the free list or the replacer. Always pick from the free list first. // 3. Update P's metadata, zero out memory and add P to the page table. // 4. Set the page ID output parameter. Return a pointer to P. frame_id_t frame_id; std::lock_guard<std::mutex> lock(latch_); // 加锁
if (!free_list_.empty()) { // 存在空余页 frame_id = free_list_.back(); free_list_.pop_back(); } else { // 需根据LRU算法淘汰一页 bool res = replacer_->Victim(&frame_id); if (!res) { returnnullptr; // 淘汰失败 } // 在这里调用disk_manager_->WritePage方法实际上破坏了API的一致性,但FlushPgImp函数需解锁,比较麻烦 if (pages_[frame_id].IsDirty()) { disk_manager_->WritePage(pages_[frame_id].page_id_, pages_[frame_id].data_); } page_table_.erase(pages_[frame_id].page_id_); // 在page_table中删除该frame对应的页 }
Page *BufferPoolManagerInstance::FetchPgImp(page_id_t page_id){ // 1. Search the page table for the requested page (P). // 1.1 If P exists, pin it and return it immediately. // 1.2 If P does not exist, find a replacement page (R) from either the free list or the replacer. // Note that pages are always found from the free list first. // 2. If R is dirty, write it back to the disk. // 3. Delete R from the page table and insert P. // 4. Update P's metadata, read in the page content from disk, and then return a pointer to P. frame_id_t frame_id; std::lock_guard<std::mutex> lock(latch_); // 加锁
auto iter = page_table_.find(page_id); if (iter != page_table_.end()) { // 原先就在buffer里 frame_id = page_table_[page_id]; if (pages_[frame_id].pin_count_ == 0) { replacer_->Pin(frame_id); // 只有pin_count为0才有可能在replacer里 } pages_[frame_id].pin_count_++; return &pages_[frame_id]; } if (!free_list_.empty()) { // 存在空余页 frame_id = free_list_.back(); free_list_.pop_back(); } else { // 需根据LRU算法淘汰一页 bool res = replacer_->Victim(&frame_id); if (!res) { returnnullptr; // 淘汰失败 } if (pages_[frame_id].IsDirty()) { disk_manager_->WritePage(pages_[frame_id].page_id_, pages_[frame_id].data_); } page_table_.erase(pages_[frame_id].page_id_); // 在page_table中删除该frame对应的页 } page_table_[page_id] = frame_id;
boolBufferPoolManagerInstance::DeletePgImp(page_id_t page_id){ // 0. Make sure you call DeallocatePage! // 1. Search the page table for the requested page (P). // 1. If P does not exist, return true. // 2. If P exists, but has a non-zero pin-count, return false. Someone is using the page. // 3. Otherwise, P can be deleted. Remove P from the page table, reset its metadata and return it to the free list. frame_id_t frame_id; std::lock_guard<std::mutex> lock(latch_); // 加锁 if (page_table_.find(page_id) == page_table_.end()) { returntrue; }
voidBufferPoolManagerInstance::ValidatePageId(constpage_id_t page_id)const{ assert(page_id % num_instances_ == instance_index_); // allocated pages mod back to this BPI }
classParallelBufferPoolManager : public BufferPoolManager { public: /** * Creates a new ParallelBufferPoolManager. * @param the number of individual BufferPoolManagerInstances to store * @param pool_size the pool size of each BufferPoolManagerInstance * @param disk_manager the disk manager * @param log_manager the log manager (for testing only: nullptr = disable logging) */ ParallelBufferPoolManager(size_t num_instances, size_t pool_size, DiskManager *disk_manager, LogManager *log_manager = nullptr);
/** * Destroys an existing ParallelBufferPoolManager. */ ~ParallelBufferPoolManager() override;
/** @return size of the buffer pool */ size_tGetPoolSize()override;
protected: /** * @param page_id id of page * @return pointer to the BufferPoolManager responsible for handling given page id */ BufferPoolManager *GetBufferPoolManager(page_id_t page_id);
/** * Fetch the requested page from the buffer pool. * @param page_id id of page to be fetched * @return the requested page */ Page *FetchPgImp(page_id_t page_id)override;
/** * Unpin the target page from the buffer pool. * @param page_id id of page to be unpinned * @param is_dirty true if the page should be marked as dirty, false otherwise * @return false if the page pin count is <= 0 before this call, true otherwise */ boolUnpinPgImp(page_id_t page_id, bool is_dirty)override;
/** * Flushes the target page to disk. * @param page_id id of page to be flushed, cannot be INVALID_PAGE_ID * @return false if the page could not be found in the page table, true otherwise */ boolFlushPgImp(page_id_t page_id)override;
/** * Creates a new page in the buffer pool. * @param[out] page_id id of created page * @return nullptr if no new pages could be created, otherwise pointer to new page */ Page *NewPgImp(page_id_t *page_id)override;
/** * Deletes a page from the buffer pool. * @param page_id id of page to be deleted * @return false if the page exists but could not be deleted, true if the page didn't exist or deletion succeeded */ boolDeletePgImp(page_id_t page_id)override;
/** * Flushes all the pages in the buffer pool to disk. */ voidFlushAllPgsImp()override;
ParallelBufferPoolManager::ParallelBufferPoolManager(size_t num_instances, size_t pool_size, DiskManager *disk_manager, LogManager *log_manager) { // Allocate and create individual BufferPoolManagerInstances num_instances_ = num_instances; pool_size_ = pool_size; start_index_ = 0; instances_ = new BufferPoolManagerInstance *[num_instances]; for (size_t i = 0; i < num_instances; i++) { instances_[i] = newBufferPoolManagerInstance(pool_size, num_instances, i, disk_manager, log_manager); } }
// Update constructor to destruct all BufferPoolManagerInstances and deallocate any associated memory ParallelBufferPoolManager::~ParallelBufferPoolManager() { for (size_t i = 0; i < num_instances_; i++) { // 释放指针所指对象空间 delete instances_[i]; } delete[] instances_; // 释放指针空间 }
size_tParallelBufferPoolManager::GetPoolSize(){ // Get size of all BufferPoolManagerInstances return pool_size_ * num_instances_; }
BufferPoolManager *ParallelBufferPoolManager::GetBufferPoolManager(page_id_t page_id){ // Get BufferPoolManager responsible for handling given page id. You can use this method in your other methods. return instances_[page_id % num_instances_]; }
Page *ParallelBufferPoolManager::FetchPgImp(page_id_t page_id){ // Fetch page for page_id from responsible BufferPoolManagerInstance BufferPoolManager *manager = GetBufferPoolManager(page_id); return manager->FetchPage(page_id); }
Page *ParallelBufferPoolManager::NewPgImp(page_id_t *page_id){ // create new page. We will request page allocation in a round robin manner from the underlying // BufferPoolManagerInstances // 1. From a starting index of the BPMIs, call NewPageImpl until either 1) success and return 2) looped around to // starting index and return nullptr // 2. Bump the starting index (mod number of instances) to start search at a different BPMI each time this function // is called Page *page; size_t index = start_index_; // printf("index is %ld\n",index); do { page = instances_[index]->NewPage(page_id); if (page != nullptr) { break; } index = (index + 1) % num_instances_; } while (index != start_index_); start_index_ = (start_index_ + 1) % num_instances_; return page; }
voidParallelBufferPoolManager::FlushAllPgsImp(){ // flush all pages from all BufferPoolManagerInstances for (size_t i = 0; i < num_instances_; i++) { instances_[i]->FlushAllPages(); } }