diff --git a/floyd/include/floyd.h b/floyd/include/floyd.h index 058e50a..69b3425 100644 --- a/floyd/include/floyd.h +++ b/floyd/include/floyd.h @@ -35,11 +35,11 @@ class Floyd { virtual Status AddServer(const std::string& new_server) = 0; // return true if leader has been elected + virtual bool IsLeader() = 0; virtual bool GetLeader(std::string* ip_port) = 0; virtual bool GetLeader(std::string* ip, int* port) = 0; virtual bool HasLeader() = 0; virtual bool GetAllNodes(std::vector* nodes) = 0; - virtual bool IsLeader() = 0; // used for debug virtual bool GetServerStatus(std::string* msg) = 0; diff --git a/floyd/src/floyd_context.h b/floyd/src/floyd_context.h index a57759a..6a9a110 100644 --- a/floyd/src/floyd_context.h +++ b/floyd/src/floyd_context.h @@ -65,8 +65,13 @@ struct FloydContext { std::atomic last_applied; uint64_t last_op_time; + // used in membership change std::vector members; + // used in lease leader + uint64_t lease_start; + uint64_t lease_end; + // mutex protect commit_index // used in floyd_apply thread and floyd_peer thread slash::Mutex global_mu; diff --git a/floyd/src/floyd_impl.cc b/floyd/src/floyd_impl.cc index 7109d70..39f75a6 100644 --- a/floyd/src/floyd_impl.cc +++ b/floyd/src/floyd_impl.cc @@ -61,35 +61,43 @@ bool FloydImpl::IsSelf(const std::string& ip_port) { return (ip_port == slash::IpPortString(options_.local_ip, options_.local_port)); } -bool FloydImpl::GetLeader(std::string *ip_port) { - if (context_->leader_ip.empty() || context_->leader_port == 0) { - return false; +/* + * if the leader is expired, then the getLeader, IsLeader will return false + */ +bool FloydImpl::IsLeader() { + uint64_t now = slash::NowMicros(); + if (context_->leader_ip == options_.local_ip && context_->leader_port == options_.local_port + && context_->lease_start <= now && now < context_->lease_end) { + return true; } - *ip_port = slash::IpPortString(context_->leader_ip, context_->leader_port); - return true; + return false; } -bool FloydImpl::IsLeader() { - if (context_->leader_ip == "" || context_->leader_port == 0) { - return false; - } - if (context_->leader_ip == options_.local_ip && context_->leader_port == options_.local_port) { +bool FloydImpl::GetLeader(std::string *ip_port) { + uint64_t now = slash::NowMicros(); + if (context_->lease_start <= now && now < context_->lease_end) { + *ip_port = slash::IpPortString(options_.local_ip, options_.local_port) return true; } return false; } bool FloydImpl::GetLeader(std::string* ip, int* port) { - *ip = context_->leader_ip; - *port = context_->leader_port; - return (!ip->empty() && *port != 0); + uint64_t now = slash::NowMicros(); + if (context_->lease_start <= now && now < context_->lease_end) { + *ip = context_->leader_ip; + *port = context_->leader_port; + return true; + } + return false; } bool FloydImpl::HasLeader() { - if (context_->leader_ip == "" || context_->leader_port == 0) { - return false; + if (context_->leader_ip != "" && context_->leader_port != 0 + && context_->lease_start <= now && now < context_->lease_end) { + return true; } - return true; + return false; } bool FloydImpl::GetAllNodes(std::vector* nodes) { diff --git a/floyd/src/floyd_impl.h b/floyd/src/floyd_impl.h index b1cf6a4..bc42743 100644 --- a/floyd/src/floyd_impl.h +++ b/floyd/src/floyd_impl.h @@ -57,11 +57,11 @@ class FloydImpl : public Floyd { virtual Status AddServer(const std::string& new_server) override; // return true if leader has been elected + virtual bool IsLeader() override; virtual bool GetLeader(std::string* ip_port) override; virtual bool GetLeader(std::string* ip, int* port) override; virtual bool HasLeader() override; virtual bool GetAllNodes(std::vector* nodes) override; - virtual bool IsLeader() override; int GetLocalPort() { return options_.local_port; diff --git a/floyd/src/floyd_peer_thread.cc b/floyd/src/floyd_peer_thread.cc index 6002636..a53c45f 100644 --- a/floyd/src/floyd_peer_thread.cc +++ b/floyd/src/floyd_peer_thread.cc @@ -90,6 +90,9 @@ void Peer::RequestVoteRPCWrapper(void *arg) { reinterpret_cast(arg)->RequestVoteRPC(); } +/* + * support lease leader version + */ void Peer::RequestVoteRPC() { uint64_t last_log_term; uint64_t last_log_index; @@ -113,7 +116,7 @@ void Peer::RequestVoteRPC() { Status result = pool_->SendAndRecv(peer_addr_, req, &res); if (!result.ok()) { - LOGV(DEBUG_LEVEL, info_log_, "Peer::RequestVoteRPC: RequestVote to %s failed %s", + LOGV(WARN_LEVEL, info_log_, "Peer::RequestVoteRPC: RequestVote to %s failed %s", peer_addr_.c_str(), result.ToString().c_str()); return; }