Skip to content

Commit

Permalink
Add sWorker API (#14)
Browse files Browse the repository at this point in the history
* Add sWorker api and complete basic decision flow

* Fix lint

* Fix build
  • Loading branch information
badkk authored Jan 7, 2021
1 parent f066c57 commit c0b1a65
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 128 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
},
"dependencies": {
"@polkadot/api": "^3.3.1",
"axios": "^0.21.1",
"bignumber.js": "^9.0.1",
"ipfs-http-client": "^48.0.0",
"ipfs-http-client": "^48.1.3",
"lodash": "^4.17.20",
"node-cron": "^2.0.3",
"winston": "^3.3.3"
Expand Down
1 change: 1 addition & 0 deletions src/chain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export default class CrustApi {
});
}

/// READ methods
/**
* Register a pubsub event, dealing with new block
* @param handler handling with new block
Expand Down
116 changes: 78 additions & 38 deletions src/decision/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import TaskQueue, {BT} from '../queue';
import IpfsApi from '../ipfs';
import CrustApi, {DetailFileInfo, FileInfo} from '../chain';
import {logger} from '../log';
import {hexToString} from '../util';
import {gigaBytesToBytes, hexToString} from '../util';
import SworkerApi from '../sworker';

// The initial probability is 5‰
const initialProbability = 0.005;
Expand All @@ -22,15 +23,23 @@ interface Task extends BT {
export default class DecisionEngine {
private readonly crustApi: CrustApi;
private readonly ipfsApi: IpfsApi;
private readonly sworkerApi: SworkerApi;
private pullingQueue: TaskQueue<Task>;
private sealingQueue: TaskQueue<Task>;
private currentBn: number;

constructor(chainAddr: string, ipfsAddr: string, mto: number) {
constructor(
chainAddr: string,
ipfsAddr: string,
sworkerAddr: string,
ito: number,
sto: number
) {
this.crustApi = new CrustApi(chainAddr);
this.ipfsApi = new IpfsApi(ipfsAddr, mto);
this.ipfsApi = new IpfsApi(ipfsAddr, ito);
this.sworkerApi = new SworkerApi(sworkerAddr, sto);

// MaxQueueLength is 50 and Overdue is 600 blocks(1h)
// MaxQueueLength is 50 and Expired with 600 blocks(1h)
this.pullingQueue = new TaskQueue<Task>(50, 600);
this.sealingQueue = new TaskQueue<Task>(30, 600);

Expand Down Expand Up @@ -73,7 +82,7 @@ export default class DecisionEngine {
size: newFile.size,
};
logger.info(
` ↪ 🎁 Found new file, adding it to pulling queue ${JSON.stringify(
` ↪ Found new file, adding it to pulling queue ${JSON.stringify(
nt
)}`
);
Expand Down Expand Up @@ -107,9 +116,9 @@ export default class DecisionEngine {

for (const pt of oldPts) {
// 2. If join pullings and start puling in ipfs, otherwise push back to pulling tasks
if (await this.pickOrDropPulling(pt)) {
if (await this.pickUpPulling(pt)) {
logger.info(
` ↪ 🎁 Pick pulling task ${JSON.stringify(pt)}, pulling from ipfs`
` ↪ 🗳 Pick pulling task ${JSON.stringify(pt)}, pulling from ipfs`
);
// Async pulling
this.ipfsApi
Expand Down Expand Up @@ -147,24 +156,32 @@ export default class DecisionEngine {
*/
async subscribeSealings(): Promise<cron.ScheduledTask> {
return cron.schedule('* * * * *', async () => {
const oldPts: Task[] = this.sealingQueue.tasks;
const newPts = new Array<Task>();
const oldSts: Task[] = this.sealingQueue.tasks;
const newSts = new Array<Task>();
logger.info('⏳ Checking sealing queue...');
logger.info(` ↪ 💌 Sealing queue length: ${oldPts.length}`);
logger.info(` ↪ 💌 Sealing queue length: ${oldSts.length}`);

// 1. Loop sealing tasks
for (const pt of oldPts) {
for (const st of oldSts) {
// 2. Judge if sealing successful, otherwise push back to sealing tasks
if (await this.pickOrDropSealing(pt.cid, pt.size)) {
// TODO: Call `sWorker.seal(pt.cid)` here
logger.info(` ↪ ⚙️ Send to sWorker: ${JSON.stringify(pt)}`);
} else {
newPts.push(pt);
if (await this.pickUpSealing(st)) {
logger.info(
` ↪ 🗳 Pick sealing task ${JSON.stringify(st)}, sending to sWorker`
);
if (await this.sworkerApi.seal(st.cid)) {
logger.info(` ↪ 💖 Seal ${st.cid} successfully`);
continue; // Continue with next sealing task
} else {
logger.error(` ↪ 💥 Seal ${st.cid} failed`);
}
}

// Otherwise, push back to sealing queue
newSts.push(st);
}

// 3. Push back to sealing queue
this.sealingQueue.tasks = newPts;
this.sealingQueue.tasks = newSts;
});
}

Expand All @@ -174,12 +191,11 @@ export default class DecisionEngine {
/// 2. judge file size and free space from local ipfs repo;
/**
* Add or ignore to pulling queue by a given cid
* @param cid ipfs cid value
* @param f_size truly file size
* @param t Task
* @returns if can pick
*/
// TODO: add pulling pick up strategy here, basically random with pks?
private async pickOrDropPulling(t: Task): Promise<boolean> {
private async pickUpPulling(t: Task): Promise<boolean> {
try {
// 1. Get and judge file size is match
const size = await this.ipfsApi.size(t.cid);
Expand All @@ -190,10 +206,8 @@ export default class DecisionEngine {
}

// 2. Get and judge repo can take it, make sure the free can take double file
// TODO: Remove this, cause this is no fucking use
const free = await this.ipfsApi.free();
const bn_f_size = new BigNumber(t.size);
if (free <= bn_f_size.multipliedBy(2)) {
const free = await this.freeSpace();
if (free <= t.size * 2) {
logger.warn(` ↪ ⚠️ Free space not enought ${free} < ${size}*2`);
return false;
}
Expand All @@ -208,41 +222,58 @@ export default class DecisionEngine {

/**
* Pick or drop sealing queue by a given cid
* @param cid ipfs cid value
* @param f_size truly file size
* @param t Task
*/
private async pickOrDropSealing(
_cid: string,
_f_size: number
): Promise<boolean> {
// TODO: check free space or just send into sWorker?
return true;
private async pickUpSealing(t: Task): Promise<boolean> {
const free = await this.freeSpace();

if (free <= t.size) {
logger.warn(` ↪ ⚠️ Free space not enought ${free} < ${t.size}`);
return false;
}

return !(await this.isReplicaFull(t.cid));
}

/**
* Query the given cid is already been picked plus a certain
* probability
* Judge if replica on chain is full
* @param cid ipfs cid value
* @param bn task block number
* @returns boolean
* @throws crustApi error
*/
private async shouldPull(cid: string, bn: number): Promise<boolean> {
private async isReplicaFull(cid: string): Promise<boolean> {
// TODO: Set flag to let user choose enable the `only take order file`
const fileInfo: DetailFileInfo | null = await this.crustApi.maybeGetNewFile(
cid
);

// If replicas already reach the limit
if (
fileInfo &&
fileInfo.replicas.length > Number(fileInfo.expected_replica_count)
) {
logger.warn(
` ↪ ⚠️ File replica already full with ${fileInfo.replicas.length}`
);
return true;
}

return false;
}

/**
* Query the given cid is already been picked plus a certain
* probability
* @param cid ipfs cid value
* @param bn task block number
* @returns should pull from ipfs
* @throws crustApi error
*/
private async shouldPull(cid: string, bn: number): Promise<boolean> {
// If replicas already reach the limit
if (await this.isReplicaFull(cid)) {
return false;
}
// else, calculate the probability with `expired_date`
// Else, calculate the probability with `expired_date`

// 1. Generate a number between 0 and 1
const randNum = Math.random();
Expand All @@ -257,4 +288,13 @@ export default class DecisionEngine {
// 3. Judge if we hit the spot
return randNum < probability;
}

/**
* Got free space size from sWorker
* @returns free space size
*/
private async freeSpace(): Promise<number> {
const freeGBSize = await this.sworkerApi.free();
return gigaBytesToBytes(freeGBSize);
}
}
12 changes: 10 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ import {logger} from './log';

const chainAddr = argv[2] || 'ws://localhost:9944';
const ipfsAddr = argv[3] || 'http://localhost:5001';
const maxIpfsTimeout = 20000; // 20s
const sworkerAddr = argv[4] || 'http://localhost:12222';
const ipfsTimeout = 20000; // 20s
const sworkerTimeout = 20000; //20s

try {
const de = new DecisionEngine(chainAddr, ipfsAddr, maxIpfsTimeout);
const de = new DecisionEngine(
chainAddr,
ipfsAddr,
sworkerAddr,
ipfsTimeout,
sworkerTimeout
);

// TODO: Get cancellation signal and handle errors?
de.subscribeNewFiles().catch(e =>
Expand Down
2 changes: 1 addition & 1 deletion src/ipfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const IpfsHttpClient = require('ipfs-http-client');
const {CID} = require('ipfs-http-client');

export default class IpfsApi {
private ipfs: any;
private readonly ipfs: any;

constructor(ipfsAddr: string, mto: number) {
// TODO: Check connection and ipfsAddr is legal
Expand Down
64 changes: 64 additions & 0 deletions src/sworker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import axios, {AxiosInstance} from 'axios';
import {parseObj} from '../util';

export default class SworkerApi {
private readonly sworker: AxiosInstance;

constructor(sworkerAddr: string, to: number) {
this.sworker = axios.create({
baseURL: sworkerAddr + '/api/v0',
timeout: to,
headers: {'Content-Type': 'application/json'},
});
}

/// WRITE methods
/**
* Seal cid
* @param cid ipfs cid
* @returns seal success or failed
* @throws sWorker api error | timeout
*/
async seal(cid: string): Promise<boolean> {
const res = await this.sworker.post(
'/storage/seal',
JSON.stringify({cid: cid})
);

return res.status === 200;
}

/**
* Delete both origin and sealed file by cid
* @param cid ipfs cid
* @returns delete success or failed
* @throws sWorker api error | timeout
*/
async delete(cid: string): Promise<boolean> {
const res = await this.sworker.post(
'/storage/delete',
JSON.stringify({cid: cid})
);

return res.status === 200;
}

/// READ methods
/**
* Query local free storage size
* @returns free space size(GB)
* @throws sWorker api error | timeout
*/
async free(): Promise<number> {
const res = await this.sworker.get('/workload');

if (res && res.status === 200) {
const body = parseObj(res.data);
return (
Number(body.srd['srd_complete']) + Number(body.srd['disk_available'])
);
}

return 0;
}
}
11 changes: 11 additions & 0 deletions src/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,14 @@ export function parseObj(o: any) {
export function hexToString(hex: string): string {
return Buffer.from(hex.substring(2), 'hex').toString();
}

/**
* GB to B
* number's max value: 9007199254740991
* so basically we don't need BigNumber at all
* @param gb GB size
* @returns B size
*/
export function gigaBytesToBytes(gb: number): number {
return gb * 1073741824;
}
Loading

0 comments on commit c0b1a65

Please sign in to comment.