1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
use super::Map;
use crate::rpc::*;
use bigdecimal::BigDecimal;
use serde::*;
use serde_json::Value;
use std::collections::HashMap;

rpc_interface! {

    trait GolemComp {
        //
        // map kwarg force
        // Returns:
        //   Some(task_id), None,
        //   None, Some(error_message)
        #[id = "comp.task.create"]
        fn create_task_int(&self, task_spec: serde_json::Value) -> Result<(Option<String>, Option<Value>)>;

        #[id = "comp.task"]
        fn get_task(&self, task_id : String) -> Result<Option<TaskInfo>>;

        //
        // *Implementation note*
        // uri comp.tasks has optional argument task_id. with task_id
        // it works as get_task. We do not need this variant.
        //
        #[id = "comp.tasks"]
        fn get_tasks(&self) -> Result<Vec<TaskInfo>>;

        /// Show statistics for unsupported tasks.
        ///
        /// # Arguments
        ///
        /// * `last_days` -  Number of last days to compute statistics on.
        ///
        /// # Returns
        ///
        /// Vec of UnsupportInfo. With stats for each reason.
        ///
        #[id = "comp.tasks.unsupport"]
        fn get_tasks_unsupported(&self, last_days: i32) -> Result<Vec<UnsupportInfo>>;

        /// Abort task with given id.
        ///
        /// # Arguments
        ///
        /// * `task_id` - Task id to abort.
        ///
        #[id = "comp.task.abort"]
        fn abort_task(&self, task_id : String) -> Result<()>;

        #[id = "comp.task.delete"]
        fn delete_task(&self, taks_id : String) -> Result<()>;

        #[id = "comp.task.subtask.restart"]
        fn restart_subtask(&self, subtask_id : String) -> Result<()>;

        #[id = "comp.task.subtask"]
        fn get_subtask(&self, subtask_id : String) -> Result<(Option<SubtaskInfo>, Option<String>)>;

        #[id = "comp.task.subtasks"]
        fn get_subtasks(&self, task_id : String) -> Result<Option<Vec<SubtaskInfo>>>;

        #[id = "comp.task.purge"]
        fn purge_tasks(&self) -> Result<()>;

        //
        // (new_task_id, None) on success; (None, error_message) on failure
        #[id = "comp.task.restart"]
        fn restart_task(&self, task_id: String) -> Result<(Option<String>, Option<String>)>;

        // TODO:
        #[id = "comp.task.subtasks.frame.restart"]
        fn restart_frame_subtasks(&self, task_id: String, frame: u32) -> Result<()>;

        /// Restarts a set of subtasks from the given task. If the specified task is
        ///  already finished, all failed subtasks will be restarted along with the
        ///  set provided as a parameter. Finished subtasks will have their results
        ///  copied over to the newly created task.
        ///
        /// ## Parameters
        ///
        ///  * `task_id`  the ID of the task which contains the given subtasks.
        ///  * `subtask_ids` the set of subtask IDs which should be restarted. If this is
        /// empty and the task is finished, all of the task's subtasks marked as failed will be
        /// restarted.
        ///  * `ignore_gas_price` if True, this will ignore long transaction time
        ///        errors and proceed with the restart.
        ///  * `disable_concent`  setting this flag to True will result in forcing
        ///       Concent to be disabled for the task. This only has effect when the task
        ///        is already finished and needs to be restarted.
        ///
        ///  ##Returns
        ///
        ///  In case of any errors, returns the representation of the error
        /// (either a string or a dict). Otherwise, returns None.
        ///
        #[id = "comp.task.subtasks.restart"]
        fn restart_subtasks_from_task(&self, task_id: String, subtask_ids: Vec<String>) -> Result<Value>;

        //
        #[id = "comp.tasks.check"]
        fn run_test_task(&self, task_spec: serde_json::Value) -> Result<bool>;

        #[id = "comp.task.test.status"]
        fn check_test_status(&self) -> Result<TaskTestResult>;

        /// Returns true if there was task to cancel
        #[id = "comp.tasks.check.abort"]
        fn abort_test_task(&self) -> Result<bool>;

        #[id = "comp.tasks.stats"]
        fn get_tasks_stats(&self) -> Result<SubtaskStats>;

        #[id = "comp.environments"]
        fn get_environments(&self) -> Result<Vec<CompEnvStatus>>;

        /// Enables enviroment
        /// Returns None or Error message.
        #[id = "comp.environment.enable"]
        fn enable_environment(&self, env_id : String) -> Result<Option<String>>;

        /// Enables enviroment
        /// Returns None or Error message.
        #[id = "comp.environment.disable"]
        fn disable_environment(&self, env_id : String) -> Result<Option<String>>;

        #[id = "comp.environment.benchmark"]
        fn run_benchmark(&self, env_id : String) -> Result<Value>;

        // timeout=3s
        #[id = "performance.multiplier.update"]
        fn perf_mult_set(&self, multiplier : f64) -> Result<()>;

        #[id = "performance.multiplier"]
        fn perf_mult(&self) -> Result<f64>;

    }
}

impl<'a, Inner: crate::rpc::wamp::RpcEndpoint + ?Sized + 'static> GolemComp<'a, Inner> {
    //
    // map kwarg force
    // Returns:
    //   Some(task_id), None,
    //   None, Some(error_message)
    pub fn create_task(
        &self,
        task_spec: serde_json::Value,
    ) -> impl Future<Item = String, Error = crate::Error> {
        fn map_to_error<F: FnOnce(&String) -> String>(
            err_obj: Value,
            format_msg: F,
        ) -> crate::Error {
            match err_obj {
                Value::String(err_msg) => crate::Error::Other(format_msg(&err_msg)),
                Value::Object(err_obj) => match err_obj.get("error_msg") {
                    Some(Value::String(err_msg)) => crate::Error::Other(format_msg(&err_msg)),
                    _ => crate::Error::Other(format!("invalid error response: {:?}", err_obj)),
                },
                _ => crate::Error::Other(format!("invalid error response: {:?}", err_obj)),
            }
        }

        self.create_task_int(task_spec)
            .from_err()
            .and_then(|r: (Option<String>, Option<Value>)| match r {
                (Some(task_id), Some(err_obj)) => Err(map_to_error(err_obj, |err_msg| {
                    format!("task {} failed: {}", task_id, err_msg)
                })),
                (Some(task_id), None) => Ok(task_id),
                (None, Some(err_obj)) => Err(map_to_error(err_obj, |err_msg| err_msg.to_string())),
                (None, None) => Err(crate::Error::Other(format!("invalid error response: null"))),
            })
    }
}

pub trait AsGolemComp: wamp::RpcEndpoint {
    fn as_golem_comp<'a>(&'a self) -> GolemComp<'a, Self>;
}

impl<Endpoint: wamp::RpcEndpoint> AsGolemComp for Endpoint {
    fn as_golem_comp<'a>(&'a self) -> GolemComp<'a, Endpoint> {
        GolemComp(self.as_invoker())
    }
}

#[derive(Serialize, Deserialize, Debug)]
pub enum TaskTestStatus {
    Started,
    Success,
    Error,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct TaskTestResult {
    // required
    pub status: TaskTestStatus,
    #[serde(default)]
    pub result: Value,
    #[serde(default)]
    pub estimated_memory: Option<f64>,
    #[serde(default)]
    pub time_spent: Option<f64>,
    // string, or array
    #[serde(default)]
    pub error: Value,
    // TODO: dict
    #[serde(default)]
    pub more: Value,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum TaskStatus {
    #[serde(rename = "Error creating")]
    ErrorCreating,
    #[serde(rename = "Not started")]
    NotStarted,
    #[serde(rename = "Creating the deposit")]
    CreatingDeposit,
    Sending,
    Creating,
    Waiting,
    Starting,
    Computing,
    Finished,
    Aborted,
    Timeout,
    #[serde(rename = "Restart")]
    Restarted,
}

impl TaskStatus {
    pub fn is_active(&self) -> bool {
        match self {
            TaskStatus::Sending
            | TaskStatus::Waiting
            | TaskStatus::Starting
            | TaskStatus::Computing => true,
            _ => false,
        }
    }

    pub fn is_completed(&self) -> bool {
        match self {
            TaskStatus::Finished
            | TaskStatus::Aborted
            | TaskStatus::Timeout
            | TaskStatus::Restarted => true,
            _ => false,
        }
    }

    pub fn is_preparing(&self) -> bool {
        match self {
            TaskStatus::NotStarted | TaskStatus::CreatingDeposit => true,
            _ => false,
        }
    }
}

// TODO: Add more fields
// TODO: Add generic deserialization to different task definition schemas.
#[derive(Serialize, Deserialize, Debug)]
pub struct TaskInfo {
    pub id: String,
    pub status: TaskStatus,
    /// Remaining time in seconds
    pub time_remaining: Option<f64>,
    pub subtasks_count: Option<u32>,
    pub progress: Option<f64>,

    pub cost: Option<BigDecimal>,
    pub fee: Option<BigDecimal>,
    pub estimated_cost: Option<BigDecimal>,
    pub estimated_fee: Option<BigDecimal>,

    #[serde(flatten)]
    pub extra: Map<String, Value>,
}

#[derive(Serialize, Deserialize, Debug)]
pub enum SubtaskStatus {
    Starting,
    Downloading,
    Verifying,
    #[serde(rename = "Failed - Resent")]
    FailedResent,
    Finished,
    Failure,
    Restart,
    Cancelled,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SubtaskInfo {
    pub subtask_id: String,
    pub node_id: String,
    pub node_name: String,
    pub status: SubtaskStatus,
    pub progress: Option<f64>,
    pub time_started: Option<f64>,
    pub results: Vec<String>,
    pub stderr: Option<String>,
    pub stdout: Option<String>,

    #[serde(flatten)]
    pub extra: Map<String, Value>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct StatsCounters {
    pub session: u32,
    pub global: u32,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SubtasksInfo {
    pub progress: f64,
    #[serde(flatten)]
    pub extra: HashMap<String, serde_json::Value>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ProviderState {
    pub status: String,
    pub subtask: Option<SubtasksInfo>,
    pub environment: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SubtaskStats {
    pub provider_state: ProviderState,
    #[serde(rename(serialize = "subtasks_in_network"))]
    pub in_network: u32,
    #[serde(rename(serialize = "subtasks_supported"))]
    pub supported: u32,
    pub subtasks_accepted: StatsCounters,
    pub subtasks_computed: StatsCounters,
    pub subtasks_rejected: StatsCounters,
    pub subtasks_with_errors: StatsCounters,
    pub subtasks_with_timeout: StatsCounters,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct UnsupportInfo {
    pub reason: String,
    #[serde(rename = "ntasks")]
    pub n_tasks: u32,
    /// avg (if available) is the current most
    ///  typical corresponding value.  For unsupport reason
    ///  MAX_PRICE avg is the average price of all tasks currently observed in
    ///  the network. For unsupport reason APP_VERSION avg is
    ///  the most popular app version of all tasks currently observed in the
    ///  network.
    pub avg: serde_json::Value,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompEnvStatus {
    pub id: String,
    pub supported: bool,
    pub accepted: bool,
    pub performance: Option<f64>,
    pub min_accepted: f64,
    pub description: String,
}