connectivity_processor.c
Go to the documentation of this file.
1 
10 #include "multipart.h"
11 #include "store.h"
12 #include "event.h"
14 #include "timeseries.h"
15 #include "file.h"
16 #include "custom_data.h"
17 #include "json.h"
18 #include "mcl_core/mcl_memory.h"
20 #include "mcl_core/mcl_file_util.h"
21 #include "mcl_core/mcl_json_util.h"
22 #include "mcl_core/mcl_random.h"
23 
31 {
40 
45 {
46  "application/octet-stream",
47  "text/plain",
48  "application/json",
49  "multipart/mixed",
50  "application/x-www-form-urlencoded",
51  "chunked"
52 };
53 
54 // Function to scan store items according to exchange result.
55 static mcl_error_t _scan_store_after_exchange(store_t *store, mcl_bool_t *store_fully_processed, mcl_error_t exchange_response);
56 
57 // Function to add all http headers for the exchange request.
58 static mcl_error_t _add_http_headers_for_exchange(connectivity_processor_t *processor, mcl_http_request_t *request, const char *boundary);
59 
60 // Function used to add Content-Type header to the request including the main boundary for the multipart message and charset.
61 static mcl_error_t _add_multipart_mixed_content_type_header(mcl_http_request_t *request, const char *boundary);
62 
63 // Function used to add authorization header to the request.
64 static mcl_error_t _add_authorization_header(mcl_http_request_t *request, const char *access_token);
65 
66 // Function used to generate correlation id for the request.
67 static mcl_error_t _generate_correlation_id_string(char *correlation_id);
68 
69 // Function used to evaluate the response to the exchange request.
70 static mcl_error_t _evaluate_response_for_exchange(mcl_http_response_t *response, const char *correlation_id);
71 
72 // Function used to add file tuple with callback.
73 static mcl_size_t _file_payload_callback(char *buffer, mcl_size_t size, mcl_size_t count, void *user_context);
74 
75 // Function used to add custom data tuple with callback.
76 static mcl_size_t _custom_data_payload_callback(char *buffer, mcl_size_t size, mcl_size_t count, void *user_context);
77 
78 // Function to prepare body for store.
79 static mcl_error_t _prepare_body_for_store(store_t *store, mcl_size_t max_http_payload_size, char *boundary, mcl_uint8_t **body, mcl_size_t *body_size);
80 
81 // Function to prepare body for item (including store).
82 static mcl_error_t _prepare_body(mcl_item_t *item, mcl_size_t max_http_payload_size, char *boundary, mcl_uint8_t **body, mcl_size_t *body_size);
83 
84 // Function to add custom data to buffer.
85 static mcl_error_t _add_custom_data_to_buffer(char *buffer, mcl_size_t *buffer_size, custom_data_t *custom_data, const char *boundary);
86 
87 // Function to calculate size of each item in store.
88 static mcl_error_t _check_store_size(store_t *store, mcl_size_t max_http_payload_size);
89 
90 // Revert status of items due to exchange failure.
91 static void _revert_store_item_status(store_t *store);
92 
93 // Remove items which are successfully exchanged from store.
95 
96 // Prepare data point mapping json.
97 static mcl_error_t _prepare_mapping_json(mapping_t *mapping, const char *agent_id, char **json);
98 
99 #define CORRELATION_ID_BYTE_LENGTH 16
100 #define CORRELATION_ID_BUFFER_LENGTH ((CORRELATION_ID_BYTE_LENGTH * 2) + MCL_NULL_CHAR_SIZE)
101 
106 {
110 
112 {
113  mcl_error_t code;
114  mcl_bool_t is_store = MCL_FALSE;
115  mcl_bool_t store_fully_processed = MCL_FALSE;
116 
117  MCL_DEBUG_ENTRY("connectivity_processor_t *connectivity_processor = <%p>, mcl_item_t *item = <%p>", connectivity_processor, item);
118 
119  // Validate item structure.
120  if (MCL_ITEM_PREAMBLE != (item)->preamble)
121  {
122  MCL_DEBUG_LEAVE("retVal = <%d>", MCL_INVALID_PARAMETER);
123  return MCL_INVALID_PARAMETER;
124  }
125 
126  // Validate item.
127  switch ((item)->type)
128  {
129  case MCL_ITEM_TYPE_EVENT:
130  code = event_validate((event_t *) item);
131  break;
132 
135  break;
136 
138  code = timeseries_validate((timeseries_t *) item);
139  break;
140 
141  case MCL_ITEM_TYPE_FILE:
142  code = file_validate((file_t *) item);
143  break;
144 
146  code = custom_data_validate((custom_data_t *) item);
147  break;
148 
149  case MCL_ITEM_TYPE_STORE:
150  is_store = MCL_TRUE;
151  code = _check_store_size((store_t *) item, connectivity_processor->max_http_payload_size);
152  break;
153 
154  default:
155  code = MCL_FAIL;
156  break;
157  }
158 
159  do
160  {
161  mcl_uint8_t *body = MCL_NULL;
162  mcl_size_t body_size;
163  mcl_http_request_t *request = MCL_NULL;
164  mcl_http_response_t *response = MCL_NULL;
165  char *boundary = MCL_NULL;
166  char correlation_id[CORRELATION_ID_BUFFER_LENGTH];
168 
169  // Generate boundary.
170  if (MCL_OK == code)
171  {
172  code = multipart_generate_boundary(&boundary);
173  }
174 
175  // Prepare HTTP request body.
176  if (MCL_OK == code)
177  {
178  code = _prepare_body(item, connectivity_processor->max_http_payload_size, boundary, &body, &body_size);
179  }
180 
181  if (MCL_OK == code)
182  {
183  code = mcl_http_request_initialize(&request);
184  }
185 
186  if (MCL_OK == code)
187  {
189  }
190 
191  if (MCL_OK == code)
192  {
194  }
195 
196  if (MCL_OK == code)
197  {
199  }
200 
201  if (MCL_OK == code)
202  {
203  code = mcl_http_request_set_parameter(request, MCL_HTTP_REQUEST_PARAMETER_URL, connectivity_processor->exchange_url);
204  }
205 
206  // Add http headers to the request.
207  if (MCL_OK == code)
208  {
209  code = _add_http_headers_for_exchange(connectivity_processor, request, boundary);
210  }
211 
212  // Free boundary since it is not necessary anymore.
213  MCL_FREE(boundary);
214 
215  // Generate correlation id.
216  if (MCL_OK == code)
217  {
218  code = _generate_correlation_id_string(correlation_id);
219  }
220 
221  // Add correlation id to request for debugging purposes.
222  if (MCL_OK == code)
223  {
224  code = mcl_http_request_add_header(request, "Correlation-ID", correlation_id);
225  }
226 
227  // Send the request and get the response.
228  if (MCL_OK == code)
229  {
230  code = mcl_http_client_send(connectivity_processor->http_client, request, &response);
231  }
232 
233  // Destroy request since it is not necessary anymore.
234  mcl_http_request_destroy(&request);
235  MCL_FREE(body);
236 
237  // Evaluate the response.
238  if (MCL_OK == code)
239  {
240  code = _evaluate_response_for_exchange(response, correlation_id);
241  }
242 
243  // Destroy http response.
244  mcl_http_response_destroy(&response);
245 
246  if (MCL_TRUE == is_store)
247  {
248  code = _scan_store_after_exchange((store_t *) item, &store_fully_processed, code);
249  }
250  } while ((MCL_TRUE == is_store) && (MCL_FALSE == store_fully_processed) && (MCL_OK == code));
251 
252  MCL_DEBUG_LEAVE("retVal = <%d>", code);
253  return code;
254 }
255 
257 {
258  mcl_error_t code;
259  mcl_http_request_t *http_request = MCL_NULL;
260  mcl_http_response_t *http_response = MCL_NULL;
261  char *json_string = MCL_NULL;
262  char correlation_id[CORRELATION_ID_BUFFER_LENGTH];
263 
264  MCL_DEBUG_ENTRY("connectivity_processor_t *connectivity_processor = <%p>, mapping_t *mapping = <%p>", connectivity_processor, mapping);
265 
266  // Validate mapping.
267  code = mapping_validate(mapping);
268 
269  // Prepare json for http request.
270  if (MCL_OK == code)
271  {
272  code = _prepare_mapping_json(mapping, connectivity_processor->agent_id, &json_string);
273  }
274 
275  if (MCL_OK == code)
276  {
277  code = mcl_http_request_initialize(&http_request);
278  }
279 
280  if (MCL_OK == code)
281  {
282  code = mcl_http_request_set_parameter(http_request, MCL_HTTP_REQUEST_PARAMETER_BODY, json_string);
283  }
284 
285  if (MCL_OK == code)
286  {
287  mcl_size_t body_size = mcl_string_util_strlen(json_string);
288 
290  }
291 
292  if (MCL_OK == code)
293  {
295 
297  }
298 
299  if (MCL_OK == code)
300  {
301  code = mcl_http_request_set_parameter(http_request, MCL_HTTP_REQUEST_PARAMETER_URL, connectivity_processor->mapping_url);
302  }
303 
304  // Generate correlation id.
305  if (MCL_OK == code)
306  {
307  code = _generate_correlation_id_string(correlation_id);
308  }
309 
310  // Add correlation id to request for debugging purposes.
311  if (MCL_OK == code)
312  {
313  code = mcl_http_request_add_header(http_request, "Correlation-ID", correlation_id);
314  }
315 
316  if (MCL_OK == code)
317  {
319  }
320 
321  if (MCL_OK == code)
322  {
324  }
325 
326  if (MCL_OK == code)
327  {
328  code = _add_authorization_header(http_request, connectivity_processor->access_token);
329  }
330 
331  if (MCL_OK == code)
332  {
333  code = mcl_http_client_send(connectivity_processor->http_client, http_request, &http_response);
334  }
335 
336  mcl_http_request_destroy(&http_request);
337  MCL_FREE(json_string);
338 
339  if (MCL_OK == code)
340  {
341  if (MCL_HTTP_STATUS_CODE_CREATED == http_response->status_code)
342  {
343  MCL_INFO("HTTP 201 OK received from server for the request with correlation-id = \"%s\".", correlation_id);
344  code = MCL_OK;
345  }
346  else
347  {
348  MCL_ERROR("HTTP <%d> received from server for the request with correlation-id = \"%s\".", http_response->status_code, correlation_id);
349 
350  if (MCL_NULL != http_response->payload)
351  {
352  MCL_ERROR("HTTP Response:\n%.*s", http_response->payload_size, http_response->payload);
353  }
354 
355  code = mcl_http_response_get_status(http_response);
356 
357  if (MCL_OK == code)
358  {
359  // This means that HTTP 200 is received instead of 201.
361  }
362  }
363  }
364 
365  mcl_http_response_destroy(&http_response);
366 
367  MCL_DEBUG_LEAVE("retVal = <%d>", code);
368  return code;
369 }
370 
372 {
373  mcl_error_t code = MCL_OK;
374  mcl_http_request_t *http_request = MCL_NULL;
375  mcl_http_response_t *http_response = MCL_NULL;
376  mcl_size_t agent_id_length;
377  mcl_size_t hostname_length;
378  mcl_size_t url_length;
379  char *url;
380  *configuration = MCL_NULL;
381 
382  MCL_DEBUG_ENTRY("connectivity_processor_t *connectivity_processor = <%p>, data_source_configuration_t **configuration = <%p>",
383  connectivity_processor, configuration);
384 
385  // Calculate url length.
386  agent_id_length = mcl_string_util_strlen(connectivity_processor->agent_id);
387  hostname_length = mcl_string_util_strlen(connectivity_processor->hostname);
388  url_length = hostname_length + agent_id_length + sizeof("/api/agentmanagement/v3/agents//dataSourceConfiguration");
389 
390  url = MCL_MALLOC(url_length);
391 
392  if (MCL_NULL == url)
393  {
394  code = MCL_OUT_OF_MEMORY;
395  }
396  else
397  {
398  char *position = url;
399 
400  mcl_string_util_memcpy(position, connectivity_processor->hostname, hostname_length);
401  position += hostname_length;
402 
403  mcl_string_util_memcpy(position, "/api/agentmanagement/v3/agents/", sizeof("/api/agentmanagement/v3/agents/") - MCL_NULL_CHAR_SIZE);
404  position += sizeof("/api/agentmanagement/v3/agents/") - MCL_NULL_CHAR_SIZE;
405 
406  mcl_string_util_memcpy(position, connectivity_processor->agent_id, agent_id_length);
407  position += agent_id_length;
408 
409  mcl_string_util_memcpy(position, "/dataSourceConfiguration", sizeof("/dataSourceConfiguration"));
410  }
411 
412  // Initialize http request.
413  if (MCL_OK == code)
414  {
415  code = mcl_http_request_initialize(&http_request);
416  }
417 
418  // Set url.
419  if (MCL_OK == code)
420  {
422  }
423 
424  MCL_FREE(url);
425 
426  // Set method.
427  if (MCL_OK == code)
428  {
430 
432  }
433 
434  // Add authorization header.
435  if (MCL_OK == code)
436  {
437  code = _add_authorization_header(http_request, connectivity_processor->access_token);
438  }
439 
440  // Response content type application/json.
441  if (MCL_OK == code)
442  {
444  }
445 
446  // Send the request.
447  if (MCL_OK == code)
448  {
449  code = mcl_http_client_send(connectivity_processor->http_client, http_request, &http_response);
450  }
451 
452  mcl_http_request_destroy(&http_request);
453 
454  if (MCL_OK == code)
455  {
456  if (MCL_HTTP_STATUS_CODE_SUCCESS == http_response->status_code)
457  {
458  code = json_parse_item((char *) http_response->payload, http_response->payload_size, (void**)configuration);
459  }
460  else
461  {
462  code = mcl_http_response_get_status(http_response);
463  }
464  }
465 
466  mcl_http_response_destroy(&http_response);
467 
468  MCL_DEBUG_LEAVE("retVal = <%d>", code);
469  return code;
470 }
471 
472 static mcl_error_t _scan_store_after_exchange(store_t *store, mcl_bool_t *store_fully_processed, mcl_error_t exchange_response)
473 {
474  mcl_error_t code = exchange_response;
475 
476  MCL_DEBUG_ENTRY("store_t *store = <%p>, mcl_bool_t *store_fully_processed = <%p>, mcl_error_t exchange_response = <%d>",
477  store, store_fully_processed, exchange_response);
478 
479  if (MCL_OK == code)
480  {
481  *store_fully_processed = _remove_uploaded_store_items(store);
482  }
483 
484  if (((MCL_TRUE == *store_fully_processed) && (0 != store->item_list->count)) || (MCL_OK != code))
485  {
487 
488  // If code is MCL_OK, indicate that the store has at least one item.
489  if (MCL_OK == code)
490  {
492  }
493  }
494 
495  MCL_DEBUG_LEAVE("retVal = <%d>", code);
496  return code;
497 }
498 
500 {
501  mcl_error_t code;
502 
503  MCL_DEBUG_ENTRY("connectivity_processor_t *processor = <%p>, mcl_http_request_t *request = <%p>, const char *boundary = <%p>",
504  processor, request, boundary);
505 
506  // Add boundary.
507  code = _add_multipart_mixed_content_type_header(request, boundary);
508 
509  if (MCL_OK == code)
510  {
512  }
513 
514  if (MCL_OK == code)
515  {
516  code = _add_authorization_header(request, processor->access_token);
517  }
518 
519  MCL_DEBUG_LEAVE("retVal = <%d>", code);
520  return code;
521 }
522 
524 {
525  mcl_error_t code;
526  char *header_value;
527  mcl_size_t header_value_length;
528  mcl_size_t constant_part_length = 24;
529 
530  MCL_DEBUG_ENTRY("mcl_http_request_t *request = <%p>, const char *boundary = <%p>", request, boundary);
531 
532  header_value_length = mcl_string_util_strlen(content_type_values[CONTENT_TYPE_MULTIPART_MIXED]) + mcl_string_util_strlen(boundary) + constant_part_length;
533 
534  header_value = MCL_MALLOC(header_value_length + 1);
535 
536  if (MCL_NULL == header_value)
537  {
538  code = MCL_OUT_OF_MEMORY;
539  MCL_ERROR_STRING("Memory can not be allocated for content-type header.");
540  }
541  else
542  {
543  // Compose header value.
544  code = mcl_string_util_snprintf(header_value, header_value_length + 1, "%s;boundary=%s;charset=utf-8",
545  content_type_values[CONTENT_TYPE_MULTIPART_MIXED], boundary);
546 
547  // Add header to http request.
548  if (MCL_OK == code)
549  {
550  code = mcl_http_request_add_header(request, "Content-Type", header_value);
551  }
552 
553  // Free header value.
554  MCL_FREE(header_value);
555  }
556 
557  MCL_DEBUG_LEAVE("retVal = <%d>", code);
558  return code;
559 }
560 
561 static mcl_error_t _add_authorization_header(mcl_http_request_t *request, const char *access_token)
562 {
563  mcl_error_t code;
564  char *jwt = MCL_NULL;
565 
566  MCL_DEBUG_ENTRY("mcl_http_request_t *request = <%p>, const char *access_token = <%s>", request, access_token);
567 
568  // Add "Bearer" before access token.
569  code = mcl_string_util_concatenate("Bearer ", access_token, &jwt);
570 
571  if (MCL_OK == code)
572  {
573  code = mcl_http_request_add_header(request, "Authorization", jwt);
574  }
575 
576  MCL_FREE(jwt);
577 
578  MCL_DEBUG_LEAVE("retVal = <%d>", code);
579  return code;
580 }
581 
582 static mcl_error_t _evaluate_response_for_exchange(mcl_http_response_t *response, const char *correlation_id)
583 {
584  mcl_error_t code;
585 
586  MCL_DEBUG_ENTRY("mcl_http_response_t *response = <%p>, const char *correlation_id = <%p>", response, correlation_id);
587 
588  if (MCL_HTTP_STATUS_CODE_SUCCESS == response->status_code)
589  {
590  MCL_INFO("HTTP 200 OK received from server for the request with correlation-id = \"%s\".", correlation_id);
591  code = MCL_OK;
592  }
593  else
594  {
595  MCL_ERROR("HTTP <%d> received from server for the request with correlation-id = \"%s\".", response->status_code, correlation_id);
596 
597  if (MCL_NULL != response->payload)
598  {
599  MCL_ERROR("HTTP Response:\n%.*s", response->payload_size, response->payload);
600  }
601 
602  code = mcl_http_response_get_status(response);
603  }
604 
605  MCL_DEBUG_LEAVE("retVal = <%d>", code);
606  return code;
607 }
608 
609 static mcl_error_t _generate_correlation_id_string(char *correlation_id)
610 {
611  mcl_error_t code;
612  mcl_int16_t index;
613  unsigned char *id = (unsigned char*) correlation_id;
614 
615  MCL_DEBUG_ENTRY("char *correlation_id = <%p>", correlation_id);
616 
618 
619  for (index = 0; (index < CORRELATION_ID_BYTE_LENGTH) && (MCL_OK == code); ++index)
620  {
621  code = mcl_string_util_snprintf(&(id[index * 2]), (mcl_size_t) 3, "%02x", (unsigned int)id[index + CORRELATION_ID_BYTE_LENGTH]);
622  }
623 
624  MCL_DEBUG_LEAVE("retVal = <%d>", code);
625  return code;
626 }
627 
628 static mcl_size_t _file_payload_callback(char *buffer, mcl_size_t size, mcl_size_t count, void *user_context)
629 {
630  mcl_size_t written_size = 0;
631 
632  MCL_DEBUG_ENTRY("char *buffer = <%p>, mcl_size_t size = <%u>, mcl_size_t count = <%u>, void *user_context = <%p>", buffer, size, count, user_context);
633 
634  mcl_file_util_fread(buffer, size, count, ((file_t *) user_context)->payload->file_descriptor, &written_size);
635 
636  MCL_DEBUG_LEAVE("retVal = <%u>", written_size);
637  return written_size;
638 }
639 
640 static mcl_size_t _custom_data_payload_callback(char *buffer, mcl_size_t size, mcl_size_t count, void *user_context)
641 {
643  mcl_size_t remaining_size;
644  mcl_size_t write_size;
645 
646  MCL_DEBUG_ENTRY("char *buffer = <%p>, mcl_size_t size = <%u>, mcl_size_t count = <%u>, void *user_context = <%p>", buffer, size, count, user_context);
647 
648  // Calculate write size.
649  custom_data = ((custom_data_callback_user_context *) user_context)->custom_data;
650  remaining_size = custom_data->payload->size - ((custom_data_callback_user_context *) user_context)->offset;
651  write_size = (remaining_size < (size * count)) ? remaining_size : (size * count);
652 
653  if (0 != write_size)
654  {
655  mcl_string_util_memcpy(buffer, custom_data->payload->buffer + ((custom_data_callback_user_context *) user_context)->offset, write_size);
656  ((custom_data_callback_user_context *) user_context)->offset += write_size;
657  }
658 
659  MCL_DEBUG_LEAVE("retVal = <%u>", write_size);
660  return write_size;
661 }
662 
663 static mcl_error_t _add_item_to_buffer(char *buffer, mcl_size_t *buffer_size, mcl_item_t *item, const char *boundary)
664 {
665  mcl_error_t code;
666 
667  MCL_DEBUG_ENTRY("char *buffer = <%p>, mcl_size_t *buffer_size = <%p>, mcl_item_t *item = <%p>, const char *boundary = <%p>",
668  buffer, buffer_size, item, boundary);
669 
670  switch (item->type)
671  {
672  case MCL_ITEM_TYPE_EVENT:
675  code = multipart_add_tuple(buffer, buffer_size, item, boundary);
676  break;
677 
678  case MCL_ITEM_TYPE_FILE:
679  mcl_file_util_rewind(((file_t *) item)->payload->file_descriptor);
680  code = multipart_add_tuple_with_callback(buffer, buffer_size, item, boundary,
682  break;
683 
685  code = _add_custom_data_to_buffer(buffer, buffer_size, (custom_data_t *) item, boundary);
686  break;
687 
688  default:
689  code = MCL_FAIL;
690  break;
691  }
692 
693  MCL_DEBUG_LEAVE("retVal = <%d>", code);
694  return code;
695 }
696 
698 {
699  mcl_size_t item_size = 0;
700 
701  MCL_DEBUG_ENTRY("mcl_item_t *item = <%p>", item);
702 
703  switch (item->type)
704  {
705  case MCL_ITEM_TYPE_EVENT:
709  break;
710 
711  case MCL_ITEM_TYPE_FILE:
713  break;
714 
716  item_size = multipart_get_tuple_size(item, ((custom_data_t *) item)->payload->content_type);
717  break;
718 
719  default:
720  break;
721  }
722 
723  MCL_DEBUG_LEAVE("retVal = <%lu>", item_size);
724  return item_size;
725 }
726 
727 static mcl_error_t _check_store_size(store_t *store, mcl_size_t max_http_payload_size)
728 {
729  mcl_error_t code = MCL_OK;
730  mcl_list_node_t *item_node = MCL_NULL;
731  mcl_size_t overhead_size;
732  mcl_bool_t all_ignored = MCL_TRUE;
733 
734  MCL_DEBUG_ENTRY("store_t *store = <%p>, mcl_size_t max_http_payload_size = <%lu>", store, max_http_payload_size);
735 
736  if (0 == store->item_list->count)
737  {
738  code = MCL_STORE_IS_EMPTY;
739  }
740  else
741  {
742  overhead_size = multipart_get_overhead_size();
743  mcl_list_reset(store->item_list);
744 
745  while (MCL_OK == mcl_list_next(store->item_list, &item_node))
746  {
747  store_item_t *store_item = (store_item_t *)item_node->data;
748  store_item->item_size = _get_item_size(store_item->item);
749 
750  if (max_http_payload_size < (overhead_size + store_item->item_size))
751  {
752  store_item->status = STORE_ITEM_STATUS_IGNORED;
753  }
754  else
755  {
756  all_ignored = MCL_FALSE;
757  }
758  }
759 
760  if (MCL_TRUE == all_ignored)
761  {
763  }
764  }
765 
766  MCL_DEBUG_LEAVE("retVal = <%d>", code);
767  return code;
768 }
769 
770 static mcl_size_t _select_store_items_to_exchange(store_t *store, mcl_size_t max_http_payload_size)
771 {
772  mcl_list_node_t *item_node = MCL_NULL;
773  mcl_size_t remaining_size = max_http_payload_size;
774  mcl_size_t needed_buffer_size;
775 
776  MCL_DEBUG_ENTRY("store_t *store = <%p>, mcl_size_t max_http_payload_size = <%lu>", store, max_http_payload_size);
777 
778  remaining_size -= multipart_get_overhead_size();
779  mcl_list_reset(store->item_list);
780 
781  while (MCL_OK == mcl_list_next(store->item_list, &item_node))
782  {
783  store_item_t *store_item = (store_item_t *)item_node->data;
784 
785  if (remaining_size >= store_item->item_size)
786  {
787  store_item->status = STORE_ITEM_STATUS_SELECTED;
788  remaining_size -= store_item->item_size;
789  }
790  }
791 
792  needed_buffer_size = max_http_payload_size - remaining_size;
793  mcl_list_reset(store->item_list);
794 
795  MCL_DEBUG_LEAVE("retVal = <%lu>", needed_buffer_size);
796  return needed_buffer_size;
797 }
798 
800 {
801  mcl_list_node_t *item_node = MCL_NULL;
802  store_item_t *store_item = MCL_NULL;
803 
804  MCL_DEBUG_ENTRY("store_t *store = <%p>", store);
805 
806  while (MCL_OK == mcl_list_next(store->item_list, &item_node))
807  {
808  store_item_t *current_store_item = (store_item_t *)item_node->data;
809 
810  if (STORE_ITEM_STATUS_SELECTED == current_store_item->status)
811  {
812  store_item = current_store_item;
813  break;
814  }
815  }
816 
817  MCL_DEBUG_LEAVE("retVal = <%p>", store_item);
818  return store_item;
819 }
820 
821 static mcl_error_t _prepare_body_for_store(store_t *store, mcl_size_t max_http_payload_size, char *boundary, mcl_uint8_t **body, mcl_size_t *body_size)
822 {
823  mcl_size_t buffer_size;
824  mcl_size_t remaining_size;
825  mcl_error_t code;
826 
827  MCL_DEBUG_ENTRY("store_t *store = <%p>, mcl_size_t max_http_payload_size = <%lu>, char *boundary = <%p>, mcl_uint8_t **body = <%p>, "\
828  "mcl_size_t *body_size = <%p>", store, max_http_payload_size, boundary, body, body_size);
829 
830  *body_size = 0;
831  remaining_size = _select_store_items_to_exchange(store, max_http_payload_size);
832  buffer_size = remaining_size;
833 
834  *body = MCL_MALLOC(buffer_size);
835 
836  if (MCL_NULL == *body)
837  {
838  code = MCL_OUT_OF_MEMORY;
839  }
840  else
841  {
842  for (;;)
843  {
844  store_item_t *store_item = _get_next_selected_store_item(store);
845 
846  if (MCL_NULL == store_item)
847  {
848  break;
849  }
850 
851  code = _add_item_to_buffer((char *)(*body + (buffer_size - remaining_size)), &remaining_size, store_item->item, boundary);
852 
853  if (MCL_OK == code)
854  {
855  store_item->status = STORE_ITEM_STATUS_PROCESSED;
856  }
857  else
858  {
859  MCL_ERROR("Item <%p> could not be added to body. Error: %s", store_item->item, MCL_CONNECTIVITY_CODE_TO_STRING(code));
860  }
861  }
862 
863  if ((buffer_size != remaining_size) && (MCL_OK == multipart_close((char *)(*body + (buffer_size - remaining_size)), &remaining_size, boundary)))
864  {
865  *body_size = buffer_size - remaining_size;
866  code = MCL_OK;
867  }
868  else
869  {
870  code = MCL_FAIL;
871  MCL_FREE(*body);
872  }
873  }
874 
875  MCL_DEBUG_LEAVE("retVal = <%d>", code);
876  return code;
877 }
878 
879 static mcl_error_t _prepare_body(mcl_item_t *item, mcl_size_t max_http_payload_size, char *boundary, mcl_uint8_t **body, mcl_size_t *body_size)
880 {
881  mcl_error_t code = MCL_OK;
882 
883  MCL_DEBUG_ENTRY("mcl_item_t *item = <%p>, mcl_size_t max_http_payload_size = <%lu>, char *boundary = <%p>, mcl_uint8_t **body = <%p>, "\
884  "mcl_size_t *body_size = <%lu>", item, max_http_payload_size, boundary, body, body_size);
885 
886  if (MCL_ITEM_TYPE_STORE == item->type)
887  {
888  code = _prepare_body_for_store((store_t *) item, max_http_payload_size, boundary, body, body_size);
889  }
890  else
891  {
892  mcl_size_t remaining_size;
893  *body_size = _get_item_size(item) + multipart_get_overhead_size();
894  remaining_size = *body_size;
895 
896  if (*body_size > max_http_payload_size)
897  {
899  }
900 
901  if (MCL_OK == code)
902  {
903  *body = MCL_MALLOC(*body_size);
904 
905  if (MCL_NULL == *body)
906  {
907  code = MCL_OUT_OF_MEMORY;
908  *body_size = 0;
909  }
910  }
911 
912  if (MCL_OK == code)
913  {
914  code = _add_item_to_buffer((char *) *body, &remaining_size, item, boundary);
915 
916  if (MCL_OK == code)
917  {
918  code = multipart_close((char *) (*body + (*body_size - remaining_size)), &remaining_size, boundary);
919  }
920 
921  if (MCL_OK != code)
922  {
923  MCL_FREE(*body);
924  *body_size = 0;
925  }
926  else if (0 != remaining_size)
927  {
928  *body_size -= remaining_size;
929  }
930  }
931  }
932 
933  MCL_DEBUG_LEAVE("retVal = <%d>", code);
934  return code;
935 }
936 
938 {
939  mcl_size_t index;
940  mcl_size_t count = store->item_list->count;
941 
942  MCL_DEBUG_ENTRY("store_t *store = <%p>", store);
943 
944  mcl_list_reset(store->item_list);
945 
946  for (index = 0; index < count; ++index)
947  {
948  mcl_list_node_t *node = MCL_NULL;
949 
950  (void) mcl_list_next(store->item_list, &node);
951  ((store_item_t *) node->data)->status = STORE_ITEM_STATUS_READY;
952  }
953 
954  MCL_DEBUG_LEAVE("retVal = <void>");
955 }
956 
958 {
959  mcl_bool_t fully_processed = MCL_TRUE;
960  mcl_size_t index;
961  mcl_size_t count = store->item_list->count;
962 
963  MCL_DEBUG_ENTRY("store_t *store = <%p>", store);
964 
965  mcl_list_reset(store->item_list);
966 
967  for (index = 0; index < count; ++index)
968  {
969  mcl_list_node_t *node = MCL_NULL;
970 
971  (void) mcl_list_next(store->item_list, &node);
972 
973  switch (((store_item_t *) node->data)->status)
974  {
977  break;
978 
980  ((store_item_t *) node->data)->status = STORE_ITEM_STATUS_READY;
981  fully_processed = MCL_FALSE;
982  break;
983 
985  fully_processed = MCL_FALSE;
986  break;
987 
988  default:
989  break;
990  }
991  }
992 
993  MCL_DEBUG_LEAVE("retVal = <%u>", fully_processed);
994  return fully_processed;
995 }
996 
997 static mcl_error_t _add_custom_data_to_buffer(char *buffer, mcl_size_t *buffer_size, custom_data_t *custom_data, const char *boundary)
998 {
999  mcl_error_t code;
1000  custom_data_callback_user_context user_context;
1001 
1002  MCL_DEBUG_ENTRY("char *buffer = <%p>, mcl_size_t *buffer_size = <%p>, custom_data_t *custom_data = <%p>, const char *boundary = <%p>",
1003  buffer, buffer_size, custom_data, boundary);
1004 
1005  // Set user context.
1006  user_context.custom_data = custom_data;
1007  user_context.offset = 0;
1008 
1009  // Add custom data to http_request.
1010  code = multipart_add_tuple_with_callback(buffer, buffer_size, custom_data, boundary,
1011  custom_data->payload->content_type, _custom_data_payload_callback, &user_context);
1012 
1013  MCL_DEBUG_LEAVE("retVal = <%d>", code);
1014  return code;
1015 }
1016 
1017 static mcl_error_t _prepare_mapping_json(mapping_t *mapping, const char *agent_id, char **json)
1018 {
1019  mcl_error_t code;
1020  mcl_json_t *json_object = MCL_NULL;
1021  *json = MCL_NULL;
1022 
1023  MCL_DEBUG_ENTRY("mapping_t *mapping = <%p>, const char *agent_id = <%p>, char **json = <%p>", mapping, agent_id, json);
1024 
1025  code = mcl_json_util_initialize(MCL_JSON_OBJECT, &json_object);
1026 
1027  if (MCL_OK == code)
1028  {
1029  code = mcl_json_util_add_string(json_object, "agentId", agent_id);
1030  }
1031 
1032  if (MCL_OK == code)
1033  {
1034  code = mcl_json_util_add_string(json_object, "dataPointId", mapping->data_point_id);
1035  }
1036 
1037  if (MCL_OK == code)
1038  {
1039  const char *entity_id;
1040 
1041  if (MCL_NULL == mapping->entity_id)
1042  {
1043  entity_id = agent_id;
1044  }
1045  else
1046  {
1047  entity_id = mapping->entity_id;
1048  }
1049 
1050  code = mcl_json_util_add_string(json_object, "entityId", entity_id);
1051  }
1052 
1053  if (MCL_OK == code)
1054  {
1055  code = mcl_json_util_add_string(json_object, "propertySetName", mapping->property_set_name);
1056  }
1057 
1058  if (MCL_OK == code)
1059  {
1060  code = mcl_json_util_add_string(json_object, "propertyName", mapping->property_name);
1061  }
1062 
1063  if (MCL_OK == code)
1064  {
1065  code = mcl_json_util_add_bool(json_object, "keepMapping", mapping->keep_mapping);
1066  }
1067 
1068  if (MCL_OK == code)
1069  {
1070  code = mcl_json_util_to_string(json_object, json);
1071  }
1072 
1073  mcl_json_util_destroy(&json_object);
1074 
1075  MCL_DEBUG_LEAVE("retVal = <%d>", code);
1076  return code;
1077 }
const mcl_uint8_t * buffer
Buffer of custom data.
Definition: custom_data.h:21
End of content type enumeration.
MCL_CORE_EXPORT mcl_error_t mcl_http_request_initialize(mcl_http_request_t **http_request)
static mcl_bool_t _remove_uploaded_store_items(store_t *store)
Content type is text plain.
Content type is multipart mixed.
MCL_CORE_EXPORT void mcl_file_util_rewind(void *file_descriptor)
Data source configuration module header file.
static mcl_error_t _prepare_body_for_store(store_t *store, mcl_size_t max_http_payload_size, char *boundary, mcl_uint8_t **body, mcl_size_t *body_size)
mcl_error_t event_validate(event_t *event)
Definition: event.c:158
size_t mcl_size_t
custom_data_payload_t * payload
Payload of custom data.
Definition: custom_data.h:34
Item type custom data.
Definition: item.h:27
Item type data source configuration.
Definition: item.h:28
E_MCL_HTTP_STATUS_CODE status_code
MCL_OK
mcl_size_t payload_size
MCL_CORE_EXPORT mcl_error_t mcl_json_util_initialize(E_MCL_JSON_TYPE json_type, mcl_json_t **root)
Content type is application/x-www-form-urlencoded.
mcl_error_t multipart_add_tuple(char *buffer, mcl_size_t *buffer_size, void *item, const char *boundary)
Definition: multipart.c:117
MCL_HTTP_REQUEST_PARAMETER_BODY_SIZE
void * item
Item to be added to the store.
Definition: store.h:39
void mcl_json_t
mcl_size_t size
Size of custom data.
Definition: custom_data.h:24
static mcl_error_t _prepare_body(mcl_item_t *item, mcl_size_t max_http_payload_size, char *boundary, mcl_uint8_t **body, mcl_size_t *body_size)
static mcl_error_t _scan_store_after_exchange(store_t *store, mcl_bool_t *store_fully_processed, mcl_error_t exchange_response)
static mcl_error_t _prepare_mapping_json(mapping_t *mapping, const char *agent_id, char **json)
MCL_CORE_EXPORT mcl_error_t mcl_http_request_set_parameter(mcl_http_request_t *http_request, E_MCL_HTTP_REQUEST_PARAMETER parameter, const void *value)
static mcl_size_t _select_store_items_to_exchange(store_t *store, mcl_size_t max_http_payload_size)
mcl_size_t multipart_get_overhead_size(void)
Definition: multipart.c:273
char * entity_id
Entity ID.
Definition: mapping.h:20
mcl_size_t multipart_get_tuple_size(void *item, const char *payload_content_type)
Definition: multipart.c:237
MCL_JSON_OBJECT
E_CONTENT_TYPE_VALUES
mcl_int32_t mcl_error_t
MCL_UNEXPECTED_RESULT_CODE
mcl_list_t * item_list
Contains store_item_t items.
Definition: store.h:51
#define CORRELATION_ID_BUFFER_LENGTH
void store_item_destroy(store_item_t **store_item)
Definition: store.c:186
Current item has been added to the current http request.
Definition: store.h:22
MCL_CORE_EXPORT mcl_error_t mcl_json_util_add_bool(mcl_json_t *root, const char *object_name, const mcl_bool_t bool_value)
#define MCL_DEBUG_ENTRY(...)
Store module header file.
mcl_error_t data_source_configuration_validate(data_source_configuration_t *data_source_configuration)
mcl_bool_t keep_mapping
Custom parameters for the mapping.
Definition: mapping.h:23
void(* mcl_list_item_destroy_callback)(void **item)
static mcl_size_t _get_item_size(mcl_item_t *item)
File module header file.
#define MCL_FALSE
mcl_error_t multipart_add_tuple_with_callback(char *buffer, mcl_size_t *buffer_size, void *item, const char *boundary, const char *payload_content_type, multipart_add_payload_callback callback, void *user_context)
Definition: multipart.c:130
MCL_CORE_EXPORT mcl_error_t mcl_json_util_add_string(mcl_json_t *root, const char *object_name, const char *object_value)
mcl_error_t connectivity_processor_exchange(connectivity_processor_t *connectivity_processor, mcl_item_t *item)
MCL_CORE_EXPORT mcl_size_t mcl_string_util_strlen(const char *buffer)
The store trying to be exchanged has no data inside.
MCL_CORE_EXPORT mcl_error_t mcl_list_next(mcl_list_t *list, mcl_list_node_t **node)
#define MCL_ERROR_STRING(string)
MCL_CORE_EXPORT void mcl_json_util_destroy(mcl_json_t **root)
MCL_CORE_EXPORT mcl_error_t mcl_http_request_add_header(mcl_http_request_t *http_request, const char *header_name, const char *header_value)
Content type is application json.
char * data_point_id
Mapping ID.
Definition: mapping.h:19
mcl_error_t timeseries_validate(timeseries_t *timeseries)
Definition: timeseries.c:109
Connectivity processor module header file for connectivity module.
MCL_CORE_EXPORT mcl_error_t mcl_list_remove_with_content(mcl_list_t *list, mcl_list_node_t *node, mcl_list_item_destroy_callback callback)
Item type timeseries.
Definition: item.h:24
static const char * content_type_values[CONTENT_TYPE_VALUES_END]
E_MCL_HTTP_METHOD
#define MCL_NULL
static mcl_size_t _file_payload_callback(char *buffer, mcl_size_t size, mcl_size_t count, void *user_context)
MCL_HTTP_REQUEST_PARAMETER_URL
Timeseries module header file.
Definition: store.h:48
MCL_CORE_EXPORT void mcl_file_util_fread(void *data, mcl_size_t size, mcl_size_t count, void *file_descriptor, mcl_size_t *actual_count)
#define MCL_ERROR(...)
Json module header file.
#define MCL_CONNECTIVITY_CODE_TO_STRING(code)
static mcl_size_t _custom_data_payload_callback(char *buffer, mcl_size_t size, mcl_size_t count, void *user_context)
Definition: file.h:31
static void _revert_store_item_status(store_t *store)
MCL_HTTP_GET
char * property_name
Property name.
Definition: mapping.h:22
static mcl_error_t _add_custom_data_to_buffer(char *buffer, mcl_size_t *buffer_size, custom_data_t *custom_data, const char *boundary)
mcl_error_t mapping_validate(mapping_t *mapping)
Definition: mapping.c:87
MCL_CORE_EXPORT mcl_error_t mcl_string_util_concatenate(const char *string_1, const char *string_2, char **result)
This item is not processed before, selected for current request.
Definition: store.h:21
#define MCL_FREE(p)
static mcl_error_t _check_store_size(store_t *store, mcl_size_t max_http_payload_size)
int16_t mcl_int16_t
E_CONTENT_TYPE_VALUES
MCL_CORE_EXPORT void mcl_http_request_destroy(mcl_http_request_t **http_request)
mcl_error_t json_parse_item(const char *json_string, mcl_size_t string_length, void **item)
Definition: json.c:467
mcl_uint8_t * payload
uint8_t mcl_uint8_t
mcl_size_t item_size
Size of the item in the store.
Definition: store.h:42
This item is not processed before.
Definition: store.h:20
MCL_CORE_EXPORT void mcl_string_util_memcpy(void *destination, const void *source, mcl_size_t count)
MCL_HTTP_REQUEST_PARAMETER_BODY
static mcl_error_t _add_authorization_header(mcl_http_request_t *request, const char *access_token)
E_MCL_STORE_ITEM_STATUS status
Status of item in the store.
Definition: store.h:40
MCL_CORE_EXPORT mcl_error_t mcl_http_response_get_status(mcl_http_response_t *http_response)
MCL_CORE_EXPORT mcl_error_t mcl_string_util_snprintf(char *string, mcl_size_t length, const char *format,...)
MCL_CORE_EXPORT void mcl_list_reset(mcl_list_t *list)
MCL_CORE_EXPORT mcl_error_t mcl_http_client_send(mcl_http_client_t *http_client, mcl_http_request_t *http_request, mcl_http_response_t **http_response)
Item type store.
Definition: item.h:29
Item type event.
Definition: item.h:25
static mcl_error_t _add_http_headers_for_exchange(connectivity_processor_t *processor, mcl_http_request_t *request, const char *boundary)
MCL_HTTP_POST
static mcl_error_t _add_multipart_mixed_content_type_header(mcl_http_request_t *request, const char *boundary)
mcl_error_t multipart_close(char *buffer, mcl_size_t *buffer_size, const char *boundary)
Definition: multipart.c:250
E_MCL_ITEM_TYPE type
Item type.
Definition: item.h:38
#define MCL_ITEM_PREAMBLE
Definition: item.h:17
mcl_error_t multipart_generate_boundary(char **boundary)
Definition: multipart.c:280
static mcl_error_t _add_item_to_buffer(char *buffer, mcl_size_t *buffer_size, mcl_item_t *item, const char *boundary)
Event module header file.
mcl_http_client_t * http_client
Item exceeds max http payload request.
Transfer encoding chunked.
Custom data module header file.
mcl_uint8_t mcl_bool_t
mcl_error_t file_validate(file_t *file)
Definition: file.c:121
MCL_CORE_EXPORT mcl_error_t mcl_random_generate_bytes(unsigned char *buffer, mcl_size_t size)
mcl_error_t connectivity_processor_get_data_source_configuration(connectivity_processor_t *connectivity_processor, data_source_configuration_t **configuration)
There is not enough space in the http request for the current item.
Definition: store.h:23
MCL_HTTP_STATUS_CODE_SUCCESS
static mcl_error_t _evaluate_response_for_exchange(mcl_http_response_t *response, const char *correlation_id)
Content type is application octet stream.
MCL_CORE_EXPORT void mcl_http_response_destroy(mcl_http_response_t **http_response)
static mcl_error_t _generate_correlation_id_string(char *correlation_id)
mcl_error_t custom_data_validate(custom_data_t *custom_data)
Definition: custom_data.c:104
MCL_OUT_OF_MEMORY
MCL_CORE_EXPORT mcl_error_t mcl_json_util_to_string(mcl_json_t *root, char **json_string)
Item type file.
Definition: item.h:26
#define MCL_NULL_CHAR_SIZE
mcl_size_t count
#define MCL_MALLOC(bytes)
#define CORRELATION_ID_BYTE_LENGTH
Definition: event.h:36
MCL_INVALID_PARAMETER
#define MCL_DEBUG_LEAVE(...)
#define MCL_TRUE
MCL_FAIL
MCL_HTTP_REQUEST_PARAMETER_METHOD
mcl_error_t connectivity_processor_create_mapping(connectivity_processor_t *connectivity_processor, mapping_t *mapping)
MCL_HTTP_STATUS_CODE_CREATED
#define MCL_INFO(...)
char * property_set_name
Property set name.
Definition: mapping.h:21
static store_item_t * _get_next_selected_store_item(store_t *store)
Multipart module header file.
char * content_type
Content type of custom data.
Definition: custom_data.h:22