Skip to content

Commit

Permalink
Merge pull request #505 from yvxiang/wk_ptr
Browse files Browse the repository at this point in the history
Use weak_ptr in callback #480
  • Loading branch information
bluebore authored Oct 9, 2016
2 parents 07150c8 + 76ac3a2 commit cf1b34c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 20 deletions.
75 changes: 61 additions & 14 deletions src/sdk/file_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ int32_t FileImpl::AddBlock() {
const std::string& addr = block_for_write_->chains(i).address();
rpc_client_->GetStub(addr, &chunkservers_[addr]);
write_windows_[addr] = new common::SlidingWindow<int>(100,
boost::bind(&FileImpl::OnWriteCommit, this, _1, _2));
boost::bind(&FileImpl::OnWriteCommit, _1, _2));
cs_errors_[addr] = false;
WriteBlockRequest create_request;
int64_t seq = common::timer::get_micros();
Expand Down Expand Up @@ -433,7 +433,7 @@ void FileImpl::StartWrite() {
block_for_write_->set_block_size(block_for_write_->block_size() + write_buf_->Size());
write_buf_ = NULL;
boost::function<void ()> task =
boost::bind(&FileImpl::BackgroundWrite, this);
boost::bind(&FileImpl::BackgroundWrite, boost::weak_ptr<FileImpl>(shared_from_this()));
common::atomic_inc(&back_writing_);
mu_.Unlock();
thread_pool_->AddTask(task);
Expand All @@ -456,7 +456,16 @@ bool FileImpl::CheckWriteWindows() {
}

/// Send local buffer to chunkserver
void FileImpl::BackgroundWrite() {
void FileImpl::BackgroundWrite(boost::weak_ptr<FileImpl> wk_fp) {
boost::shared_ptr<FileImpl> fp(wk_fp.lock());
if (!fp) {
LOG(DEBUG, "FileImpl has been destroied, ignore backgroud write");
return;
}
fp->BackgroundWriteInternal();
}

void FileImpl::BackgroundWriteInternal() {
MutexLock lock(&mu_, "BackgroundWrite", 1000);
while(!write_queue_.empty() && CheckWriteWindows()) {
WriteBuffer* buffer = write_queue_.top();
Expand Down Expand Up @@ -499,16 +508,19 @@ void FileImpl::BackgroundWrite() {
const int max_retry_times = FLAGS_sdk_write_retry_times;
ChunkServer_Stub* stub = chunkservers_[cs_addr];
boost::function<void (const WriteBlockRequest*, WriteBlockResponse*, bool, int)> callback
= boost::bind(&FileImpl::WriteBlockCallback, this, _1, _2, _3, _4,
= boost::bind(&FileImpl::WriteBlockCallback,
boost::weak_ptr<FileImpl>(shared_from_this()),
_1, _2, _3, _4,
max_retry_times, buffer, cs_addr);

LOG(DEBUG, "BackgroundWrite start [bid:%ld, seq:%d, offset:%ld, len:%d]\n",
buffer->block_id(), buffer->Sequence(), buffer->offset(), buffer->Size());
common::atomic_inc(&back_writing_);
if (delay) {
thread_pool_->DelayTask(5,
boost::bind(&FileImpl::DelayWriteChunk, this, buffer,
request, max_retry_times, cs_addr));
boost::bind(&FileImpl::DelayWriteChunk,
boost::weak_ptr<FileImpl>(shared_from_this()),
buffer, request, max_retry_times, cs_addr));
} else {
WriteBlockResponse* response = new WriteBlockResponse;
rpc_client_->AsyncRequest(stub, &ChunkServer_Stub::WriteBlock,
Expand All @@ -523,12 +535,28 @@ void FileImpl::BackgroundWrite() {
}
}

void FileImpl::DelayWriteChunk(WriteBuffer* buffer,
const WriteBlockRequest* request,
int retry_times, std::string cs_addr) {
void FileImpl::DelayWriteChunk(boost::weak_ptr<FileImpl> wk_fp,
WriteBuffer* buffer,
const WriteBlockRequest* request,
int retry_times, std::string cs_addr) {
boost::shared_ptr<FileImpl> fp(wk_fp.lock());
if (!fp) {
LOG(DEBUG, "FileImpl has been destroied, ignore delay write");
buffer->DecRef();
delete request;
return;
}
fp->DelayWriteChunkInternal(buffer, request, retry_times, cs_addr);
}

void FileImpl::DelayWriteChunkInternal(WriteBuffer* buffer,
const WriteBlockRequest* request,
int retry_times, std::string cs_addr) {
WriteBlockResponse* response = new WriteBlockResponse;
boost::function<void (const WriteBlockRequest*, WriteBlockResponse*, bool, int)> callback
= boost::bind(&FileImpl::WriteBlockCallback, this, _1, _2, _3, _4,
= boost::bind(&FileImpl::WriteBlockCallback,
boost::weak_ptr<FileImpl>(shared_from_this()),
_1, _2, _3, _4,
retry_times, buffer, cs_addr);
common::atomic_inc(&back_writing_);
ChunkServer_Stub* stub = chunkservers_[cs_addr];
Expand All @@ -541,7 +569,25 @@ void FileImpl::DelayWriteChunk(WriteBuffer* buffer,
}
}

void FileImpl::WriteBlockCallback(const WriteBlockRequest* request,
void FileImpl::WriteBlockCallback(boost::weak_ptr<FileImpl> wk_fp,
const WriteBlockRequest* request,
WriteBlockResponse* response,
bool failed, int error,
int retry_times,
WriteBuffer* buffer,
std::string cs_addr) {
boost::shared_ptr<FileImpl> fp(wk_fp.lock());
if (!fp) {
LOG(DEBUG, "FileImpl has been destroied, ignore this callback");
buffer->DecRef();
delete request;
delete response;
return;
}
fp->WriteBlockCallbackInternal(request, response, failed, error, retry_times, buffer, cs_addr);
}

void FileImpl::WriteBlockCallbackInternal(const WriteBlockRequest* request,
WriteBlockResponse* response,
bool failed, int error,
int retry_times,
Expand Down Expand Up @@ -590,8 +636,9 @@ void FileImpl::WriteBlockCallback(const WriteBlockRequest* request,
if (!bg_error_ && retry_times > 0) {
common::atomic_inc(&back_writing_);
thread_pool_->DelayTask(5000,
boost::bind(&FileImpl::DelayWriteChunk, this, buffer,
request, retry_times, cs_addr));
boost::bind(&FileImpl::DelayWriteChunk,
boost::weak_ptr<FileImpl>(shared_from_this()),
buffer, request, retry_times, cs_addr));
} else {
buffer->DecRef();
delete request;
Expand Down Expand Up @@ -624,7 +671,7 @@ void FileImpl::WriteBlockCallback(const WriteBlockRequest* request,
}

boost::function<void ()> task =
boost::bind(&FileImpl::BackgroundWrite, this);
boost::bind(&FileImpl::BackgroundWrite, boost::weak_ptr<FileImpl>(shared_from_this()));
thread_pool_->AddTask(task);
}

Expand Down
25 changes: 19 additions & 6 deletions src/sdk/file_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <common/timer.h>
#include <common/sliding_window.h>
#include <common/thread_pool.h>
#include <boost/enable_shared_from_this.hpp>
#include <boost/weak_ptr.hpp>

#include "proto/nameserver.pb.h"
#include "proto/chunkserver.pb.h"
Expand Down Expand Up @@ -65,7 +67,7 @@ struct LocatedBlocks {
}
};

class FileImpl : public File {
class FileImpl : public File, public boost::enable_shared_from_this<FileImpl> {
public:
FileImpl(FSImpl* fs, RpcClient* rpc_client, const std::string& name,
int32_t flags, const WriteOptions& options);
Expand All @@ -79,18 +81,20 @@ class FileImpl : public File {
/// Add buffer to async write list
void StartWrite();
/// Send local buffer to chunkserver
void BackgroundWrite();
static void BackgroundWrite(boost::weak_ptr<FileImpl> wk_fp);
/// Callback for sliding window
void OnWriteCommit(int32_t, int32_t);
void WriteBlockCallback(const WriteBlockRequest* request,
static void OnWriteCommit(int32_t, int32_t);
static void WriteBlockCallback(boost::weak_ptr<FileImpl> wk_fp,
const WriteBlockRequest* request,
WriteBlockResponse* response,
bool failed, int error,
int retry_times,
WriteBuffer* buffer,
std::string cs_addr);
/// When rpc buffer full deley send write reqeust
void DelayWriteChunk(WriteBuffer* buffer, const WriteBlockRequest* request,
int retry_times, std::string cs_addr);
static void DelayWriteChunk(boost::weak_ptr<FileImpl> wk_fp,
WriteBuffer* buffer, const WriteBlockRequest* request,
int retry_times, std::string cs_addr);
int32_t Flush();
int32_t Sync();
int32_t Close();
Expand All @@ -104,6 +108,15 @@ class FileImpl : public File {
private:
int32_t AddBlock();
bool CheckWriteWindows();
void BackgroundWriteInternal();
void WriteBlockCallbackInternal(const WriteBlockRequest* request,
WriteBlockResponse* response,
bool failed, int error,
int retry_times,
WriteBuffer* buffer,
std::string cs_addr);
void DelayWriteChunkInternal(WriteBuffer* buffer, const WriteBlockRequest* request,
int retry_times, std::string cs_addr);
private:
FSImpl* fs_; ///< 文件系统
RpcClient* rpc_client_; ///< RpcClient
Expand Down

0 comments on commit cf1b34c

Please sign in to comment.