Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RONDB-763: Parse Redis commands to data structures #19

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions pink/rondis/cmd_builder/client_conn.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <utility>
#include <vector>

#include "slash_string.h"

#include "command.h"
#include "cmd_table_manager.h"
#include "client_conn.h"

extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;


std::shared_ptr<Cmd> PikaClientConn::DoCmd(const pink::RedisCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc) {
// Get command info
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
return c_ptr;
}
c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc);
c_ptr->SetResp(resp_ptr);

// Initial
c_ptr->Initial(argv);
if (!c_ptr->res().ok()) {
return c_ptr;
}

// Process Command
c_ptr->Execute();

return c_ptr;
}

void PikaClientConn::TryWriteResp() {
int expected = 0;
if (resp_num.compare_exchange_strong(expected, -1)) {
for (auto& resp : resp_array) {
WriteResp(*resp);
}
if (write_completed_cb_) {
write_completed_cb_();
write_completed_cb_ = nullptr;
}
resp_array.clear();
NotifyEpoll(true);
}
}

void PikaClientConn::PushCmdToQue(std::shared_ptr<Cmd> cmd) { txn_cmd_que_.push(cmd); }

void PikaClientConn::ClearTxnCmdQue() { txn_cmd_que_ = std::queue<std::shared_ptr<Cmd>>{}; }

void PikaClientConn::ExecRedisCmd(const pink::RedisCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr,
bool cache_miss_in_rtc) {
// get opt
std::string opt = argv[0];
slash::StringToLower(opt);
if (opt == kClusterPrefix) {
if (argv.size() >= 2) {
opt += argv[1];
slash::StringToLower(opt);
}
}

std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc);
*resp_ptr = std::move(cmd_ptr->res().message());
resp_num--;
}

std::queue<std::shared_ptr<Cmd>> PikaClientConn::GetTxnCmdQue() { return txn_cmd_que_; }
54 changes: 54 additions & 0 deletions pink/rondis/cmd_builder/client_conn.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_CLIENT_CONN_H_
#define PIKA_CLIENT_CONN_H_

#include <bitset>
#include <utility>

#include "command.h"

class PikaClientConn : public pink::RedisConn {
public:
using WriteCompleteCallback = std::function<void()>;

PikaClientConn(int fd, const std::string& ip_port, pink::Thread* server_thread, pink::NetMultiplexer* mpx,
const pink::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() = default;

int DealMessage(const pink::RedisCmdArgsType& argv, std::string* response) override { return 0; }

void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }

// Txn
std::queue<std::shared_ptr<Cmd>> GetTxnCmdQue();
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
void ClearTxnCmdQue();

pink::ServerThread* server_thread() { return server_thread_; }

std::atomic<int> resp_num;
std::vector<std::shared_ptr<std::string>> resp_array;

private:
pink::ServerThread* const server_thread_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_ = false;
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_;
std::bitset<16> txn_state_;
std::unordered_set<std::string> watched_db_keys_;
std::mutex txn_state_mu_;

bool authenticated_ = false;

std::shared_ptr<Cmd> DoCmd(const pink::RedisCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ExecRedisCmd(const pink::RedisCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void TryWriteResp();
};

#endif
38 changes: 38 additions & 0 deletions pink/rondis/cmd_builder/cmd_table_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "cmd_table_manager.h"

#include <sys/syscall.h>
#include <unistd.h>


PikaCmdTableManager::PikaCmdTableManager() {
cmds_ = std::make_unique<CmdTable>();
cmds_->reserve(300);
}

void PikaCmdTableManager::InitCmdTable(void) {
::InitCmdTable(cmds_.get());
}

std::shared_ptr<Cmd> PikaCmdTableManager::GetCmd(const std::string& opt) {
const std::string& internal_opt = opt;
return NewCommand(internal_opt);
}

std::shared_ptr<Cmd> PikaCmdTableManager::NewCommand(const std::string& opt) {
Cmd* cmd = GetCmdFromDB(opt, *cmds_);
if (cmd) {
return std::shared_ptr<Cmd>(cmd->Clone());
}
return nullptr;
}

CmdTable* PikaCmdTableManager::GetCmdTable() { return cmds_.get(); }

uint32_t PikaCmdTableManager::GetMaxCmdId() { return cmdId_; }

bool PikaCmdTableManager::CmdExist(const std::string& cmd) const { return cmds_->find(cmd) != cmds_->end(); }
35 changes: 35 additions & 0 deletions pink/rondis/cmd_builder/cmd_table_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_CMD_TABLE_MANAGER_H_
#define PIKA_CMD_TABLE_MANAGER_H_

#include <shared_mutex>
#include <thread>

#include "command.h"

class PikaCmdTableManager {

public:
PikaCmdTableManager();
virtual ~PikaCmdTableManager() = default;
void InitCmdTable(void);
std::shared_ptr<Cmd> GetCmd(const std::string& opt);
bool CmdExist(const std::string& cmd) const;
CmdTable* GetCmdTable();
uint32_t GetMaxCmdId();

private:
std::shared_ptr<Cmd> NewCommand(const std::string& opt);

std::unique_ptr<CmdTable> cmds_;

uint32_t cmdId_ = 0;

std::shared_mutex map_protector_;

};
#endif
Loading