Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,10 @@ std::string Connection::RemoteEndpointStr() const {
return connection_str;
}

std::string Connection::RemoteEndpointAddress() const {
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
auto re = lsb->RemoteEndpoint();
return re.address().to_string();
}

} // namespace facade
1 change: 1 addition & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class Connection : public util::Connection {

std::string GetClientInfo() const;
std::string RemoteEndpointStr() const;
std::string RemoteEndpointAddress() const;
uint32 GetClientId() const;

void ShutdownSelf();
Expand Down
45 changes: 38 additions & 7 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ struct TransactionGuard {

} // namespace

DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, SyncState sync_state)
: address(address) {
switch (sync_state) {
case SyncState::PREPARATION:
state = "preparation";
break;
case SyncState::FULL_SYNC:
state = "full sync";
break;
case SyncState::STABLE_SYNC:
state = "stable sync";
break;
case SyncState::CANCELLED:
state = "cancelled";
break;
default:
break;
}
}

DflyCmd::DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family)
: sf_(server_family), listener_(listener) {
}
Expand Down Expand Up @@ -213,7 +233,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
return;

unique_lock lk(replica_ptr->mu);
if (replica_ptr->state != SyncState::PREPARATION)
if (replica_ptr->state.load(memory_order_relaxed) != SyncState::PREPARATION)
return rb->SendError(kInvalidState);

// Set meta info on connection.
Expand Down Expand Up @@ -263,7 +283,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return rb->SendError(kInvalidState);
}

replica_ptr->state = SyncState::FULL_SYNC;
replica_ptr->state.store(SyncState::FULL_SYNC, memory_order_relaxed);
return rb->SendOk();
}

Expand Down Expand Up @@ -299,7 +319,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
return rb->SendError(kInvalidState);
}

replica_ptr->state = SyncState::STABLE_SYNC;
replica_ptr->state.store(SyncState::STABLE_SYNC, memory_order_relaxed);
return rb->SendOk();
}

Expand Down Expand Up @@ -403,7 +423,8 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
}
}

uint32_t DflyCmd::CreateSyncSession() {
uint32_t DflyCmd::CreateSyncSession(ConnectionContext* cntx) {
;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird ;

unique_lock lk(mu_);
unsigned sync_id = next_sync_id_++;

Expand All @@ -416,7 +437,8 @@ uint32_t DflyCmd::CreateSyncSession() {
::boost::fibers::fiber{&DflyCmd::StopReplication, this, sync_id}.detach();
};

auto replica_ptr = make_shared<ReplicaInfo>(flow_count, std::move(err_handler));
auto replica_ptr = make_shared<ReplicaInfo>(flow_count, cntx->owner()->RemoteEndpointAddress(),
std::move(err_handler));
auto [it, inserted] = replica_infos_.emplace(sync_id, std::move(replica_ptr));
CHECK(inserted);

Expand Down Expand Up @@ -448,14 +470,14 @@ void DflyCmd::StopReplication(uint32_t sync_id) {

void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr<ReplicaInfo> replica_ptr) {
lock_guard lk(replica_ptr->mu);
if (replica_ptr->state == SyncState::CANCELLED) {
if (replica_ptr->state.load(memory_order_relaxed) == SyncState::CANCELLED) {
return;
}

LOG(INFO) << "Cancelling sync session " << sync_id;

// Update replica_ptr state and cancel context.
replica_ptr->state = SyncState::CANCELLED;
replica_ptr->state.store(SyncState::CANCELLED, memory_order_release);
replica_ptr->cntx.Cancel();

// Run cleanup for shard threads.
Expand Down Expand Up @@ -503,6 +525,15 @@ shared_ptr<DflyCmd::ReplicaInfo> DflyCmd::GetReplicaInfo(uint32_t sync_id) {
return {};
}

std::vector<DflyCmd::ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() {
std::vector<ReplicaRoleInfo> vec;
unique_lock lk(mu_);
for (const auto& info : replica_infos_) {
vec.emplace_back(info.second->address, info.second->state.load(memory_order_relaxed));
}
return vec;
}

pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply(
std::string_view id_str, RedisReplyBuilder* rb) {
unique_lock lk(mu_);
Expand Down
17 changes: 13 additions & 4 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,23 @@ class DflyCmd {

// Stores information related to a single replica.
struct ReplicaInfo {
ReplicaInfo(unsigned flow_count, Context::ErrHandler err_handler)
: state{SyncState::PREPARATION}, cntx{std::move(err_handler)}, flows{flow_count} {
ReplicaInfo(unsigned flow_count, std::string address, Context::ErrHandler err_handler)
: state{SyncState::PREPARATION}, address{address}, cntx{std::move(err_handler)},
flows{flow_count} {
}

SyncState state;
std::atomic<SyncState> state;
std::string address;
Context cntx;

std::vector<FlowInfo> flows;
::boost::fibers::mutex mu; // See top of header for locking levels.
};
struct ReplicaRoleInfo {
ReplicaRoleInfo(std::string address, SyncState sync_state);
std::string address;
std::string state;
};

public:
DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family);
Expand All @@ -125,7 +132,9 @@ class DflyCmd {
void Shutdown();

// Create new sync session.
uint32_t CreateSyncSession();
uint32_t CreateSyncSession(ConnectionContext* cntx);

std::vector<ReplicaRoleInfo> GetReplicasRoleInfo();

private:
// JOURNAL [START/STOP]
Expand Down
32 changes: 29 additions & 3 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("connected_slaves", m.conn_stats.num_replicas);
append("master_replid", master_id_);
} else {
append("role", "slave");
append("role", "replica");

// it's safe to access replica_ because replica_ is created before etl.is_master set to
// false and cleared after etl.is_master is set to true. And since the code here that checks
Expand Down Expand Up @@ -1545,7 +1545,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
std::string_view arg = ArgS(args, i + 1);
if (cmd == "CAPA") {
if (arg == "dragonfly" && args.size() == 3 && i == 1) {
uint32_t sid = dfly_cmd_->CreateSyncSession();
uint32_t sid = dfly_cmd_->CreateSyncSession(cntx);
cntx->owner()->SetName(absl::StrCat("repl_ctrl_", sid));

string sync_id = absl::StrCat("SYNC", sid);
Expand Down Expand Up @@ -1576,7 +1576,33 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
}

void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendRaw("*3\r\n$6\r\nmaster\r\n:0\r\n*0\r\n");
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
(*cntx)->StartArray(2);
(*cntx)->SendBulkString("master");
auto vec = dfly_cmd_->GetReplicasRoleInfo();
(*cntx)->StartArray(vec.size());
for (auto& data : vec) {
(*cntx)->StartArray(2);
(*cntx)->SendBulkString(data.address);
(*cntx)->SendBulkString(data.state);
}

} else {
auto replica_ptr = replica_;
Replica::Info rinfo = replica_ptr->GetInfo();
(*cntx)->StartArray(4);
(*cntx)->SendBulkString("replica");
(*cntx)->SendBulkString(rinfo.host);
(*cntx)->SendBulkString(absl::StrCat(rinfo.port));
if (rinfo.sync_in_progress) {
(*cntx)->SendBulkString("full sync");
} else if (!rinfo.master_link_established) {
(*cntx)->SendBulkString("connecting");
} else {
(*cntx)->SendBulkString("stable sync");
}
}
}

void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) {
Expand Down