dav/sync.c

changeset 536
877f7c4a203b
parent 533
5b9f20aa88c2
child 539
8deb52292c99
equal deleted inserted replaced
535:d814ee31c04f 536:877f7c4a203b
39 #include <ucx/string.h> 39 #include <ucx/string.h>
40 #include <ucx/utils.h> 40 #include <ucx/utils.h>
41 #include <ucx/properties.h> 41 #include <ucx/properties.h>
42 #include <dirent.h> 42 #include <dirent.h>
43 43
44 #include <math.h>
45
44 #include <libidav/webdav.h> 46 #include <libidav/webdav.h>
45 #include <libidav/utils.h> 47 #include <libidav/utils.h>
46 #include <libidav/crypto.h> 48 #include <libidav/crypto.h>
47 49
48 #include <libidav/session.h> 50 #include <libidav/session.h>
75 static DavPropName defprops[] = { 77 static DavPropName defprops[] = {
76 { "DAV:", "getetag" }, 78 { "DAV:", "getetag" },
77 { DAV_NS, "status" }, 79 { DAV_NS, "status" },
78 { DAV_NS, "finfo" }, 80 { DAV_NS, "finfo" },
79 { DAV_NS, "tags" }, 81 { DAV_NS, "tags" },
80 { DAV_NS, "xattributes" } 82 { DAV_NS, "xattributes" },
83 { DAV_NS, "split" }
81 }; 84 };
82 static size_t numdefprops = 5; 85 static size_t numdefprops = 6;
83 86
84 /* 87 /*
85 * strcmp version that works with NULL pointers 88 * strcmp version that works with NULL pointers
86 */ 89 */
87 static int nullstrcmp(const char *s1, const char *s2) { 90 static int nullstrcmp(const char *s1, const char *s2) {
1413 ret = -1; 1416 ret = -1;
1414 sync_error++; 1417 sync_error++;
1415 error = 1; 1418 error = 1;
1416 } 1419 }
1417 } else { 1420 } else {
1418 if(cdt && remote_resource_is_changed(sn, dir, db, res, local_res)) { 1421 int changed = remote_resource_is_changed(sn, dir, db, res, local_res);
1422 if(cdt && changed) {
1419 printf("conflict: %s\n", local_res->path); 1423 printf("conflict: %s\n", local_res->path);
1420 local_res->last_modified = 0; 1424 local_res->last_modified = 0;
1421 local_res->skipped = TRUE; 1425 local_res->skipped = TRUE;
1422 sync_skipped++; 1426 sync_skipped++;
1423 } else { 1427 } else {
2007 } 2011 }
2008 2012
2009 if(db_res->last_modified == res->last_modified && db_res->size == res->size) { 2013 if(db_res->last_modified == res->last_modified && db_res->size == res->size) {
2010 return 0; 2014 return 0;
2011 } 2015 }
2016
2017 if(db_res->parts) {
2018 // if the resource is splitted, move the part infos to the new
2019 // LocalResource obj, because we need it later
2020 res->parts = db_res->parts;
2021 res->numparts = db_res->numparts;
2022 db_res->parts = NULL;
2023 db_res->numparts = 0;
2024 }
2012 } else { 2025 } else {
2013 res->tags_updated = 1; 2026 res->tags_updated = 1;
2014 res->finfo_updated = 1; 2027 res->finfo_updated = 1;
2015 res->xattr_updated = 1; 2028 res->xattr_updated = 1;
2016 res->metadata_updated = 1; 2029 res->metadata_updated = 1;
2024 SyncDatabase *db, 2037 SyncDatabase *db,
2025 DavResource *remote, 2038 DavResource *remote,
2026 LocalResource *res) 2039 LocalResource *res)
2027 { 2040 {
2028 DavPropName properties[] = { 2041 DavPropName properties[] = {
2029 {"DAV:","getetag"}, 2042 {"DAV:", "getetag"},
2030 {DAV_NS,"tags"}, 2043 {DAV_NS, "tags"},
2031 {DAV_NS,VERSION_PATH_PROPERTY} 2044 {DAV_NS, "version-collection"},
2045 {DAV_NS, "split" }
2032 }; 2046 };
2033 int err = dav_load_prop(remote, properties, 3); 2047 int err = dav_load_prop(remote, properties, 4);
2034 2048
2035 if(res->restore) { 2049 if(res->restore) {
2036 return 0; 2050 return 0;
2037 } 2051 }
2038 2052
2054 // something weird is happening, the server must support etags 2068 // something weird is happening, the server must support etags
2055 fprintf(stderr, "Warning: resource %s has no etag\n", remote->href); 2069 fprintf(stderr, "Warning: resource %s has no etag\n", remote->href);
2056 } 2070 }
2057 } 2071 }
2058 return ret; 2072 return ret;
2073 }
2074
2075 size_t resource_get_blocksize(SyncDirectory *dir, LocalResource *local, DavResource *res, off_t filesize) {
2076 size_t local_blocksize = 0;
2077 if(local->blocksize < 0) {
2078 // file splitting disabled
2079 return 0;
2080 } else if(local->blocksize > 0) {
2081 local_blocksize = (size_t)local->blocksize;
2082 } else {
2083 UCX_FOREACH(elm, dir->splitconfig) {
2084 SplitConfig *sc = elm->data;
2085 if(sc->pattern) {
2086 if(regexec(sc->pattern, local->path, 0, NULL, 0) != 0) {
2087 continue;
2088 }
2089 }
2090
2091 if(sc->minsize > 0) {
2092 if(filesize < sc->minsize) {
2093 continue;
2094 }
2095 }
2096
2097 local_blocksize = sc->blocksize;
2098 break;
2099 }
2100 }
2101
2102 size_t svr_blocksize = 0;
2103 char *svr_blocksize_str = dav_get_string_property_ns(res, DAV_NS, "split");
2104 if(svr_blocksize_str) {
2105 uint64_t i = 0;
2106 if(util_strtouint(svr_blocksize_str, &i)) {
2107 svr_blocksize = (size_t)i;
2108 }
2109 }
2110
2111 if(local_blocksize > 0 && svr_blocksize > 0) {
2112 fprintf(stderr, "Warning: Blocksize mismatch: %s: local: %zu server: %zu\n", local->path, local_blocksize, svr_blocksize);
2113 return svr_blocksize;
2114 } else if(local_blocksize > 0) {
2115 return local_blocksize;
2116 } else if(svr_blocksize > 0) {
2117 return svr_blocksize;
2118 }
2119
2120 return 0;
2121
2059 } 2122 }
2060 2123
2061 int resource_pathlen_cmp(LocalResource *res1, LocalResource *res2, void *n) { 2124 int resource_pathlen_cmp(LocalResource *res1, LocalResource *res2, void *n) {
2062 size_t s1 = strlen(res1->path); 2125 size_t s1 = strlen(res1->path);
2063 size_t s2 = strlen(res2->path); 2126 size_t s2 = strlen(res2->path);
2601 } 2664 }
2602 local->xattr_hash = hashes.xattr; 2665 local->xattr_hash = hashes.xattr;
2603 } 2666 }
2604 } 2667 }
2605 2668
2669 // this macro is only a workaround for a netbeans bug
2670 #define LOG10 log10
2671
2672 static UcxList* upload_parts(
2673 LocalResource *local,
2674 DavResource *res,
2675 FILE *in,
2676 uint64_t filesize,
2677 size_t blocksize,
2678 uint64_t *blockcount,
2679 int *err)
2680 {
2681 // Make sure the resource is a collection. If it was a normal
2682 // resource until now, delete it and recreate it as collection
2683 if(res->exists) {
2684 if(!res->iscollection) {
2685 if(dav_delete(res)) {
2686 print_resource_error(res->session, res->path);
2687 *err = 1;
2688 return NULL;
2689 }
2690 res->exists = 0;
2691 return upload_parts(local, res, in, filesize, blocksize, blockcount, err);
2692 }
2693 } else {
2694 res->iscollection = 1;
2695 if(dav_create(res)) {
2696 print_resource_error(res->session, res->path);
2697 *err = 1;
2698 return NULL;
2699 }
2700 }
2701 res->exists = 1;
2702
2703 if(!res->href) {
2704 // this should never happen, but just make sure it doesn't crash
2705 fprintf(stderr, "href is NULL\n");
2706 *err = 1;
2707 return NULL;
2708 }
2709
2710 char *buffer = malloc(blocksize);
2711 if(!buffer) {
2712 fprintf(stderr, "Out of memory\n");
2713 *err = 1;
2714 return NULL;
2715 }
2716
2717 // calculate the maximal length of resource names
2718 // names should have all the same length and contain the block number
2719 int nblocks = filesize / blocksize;
2720 int digits = LOG10((double)nblocks) + 1;
2721 if(digits > 127) {
2722 fprintf(stderr, "Too many parts\n");
2723 *err = 1;
2724 return NULL;
2725 }
2726
2727 UcxMap *updated_parts_map = ucx_map_new((nblocks/2)+64);
2728
2729 int blockindex = 0;
2730 int uploaded_parts = 0;
2731 size_t r;
2732
2733 // temporarly disable name encryption, because we don't need it for
2734 // part names
2735 uint32_t session_flags = res->session->flags;
2736 res->session->flags ^= DAV_SESSION_ENCRYPT_NAME;
2737
2738 while((r = fread(buffer, 1, blocksize, in)) > 0) {
2739 int upload_block = 0;
2740 char *block_hash = dav_create_hash(buffer, r);
2741 if(blockindex >= local->numparts) {
2742 // we don't have a hash for this block, therefore it must be new
2743 upload_block = 1;
2744 } else {
2745 FilePart part = local->parts[blockindex];
2746 if(!strcmp(part.hash, block_hash)) {
2747 // no change
2748 free(block_hash);
2749 block_hash = NULL;
2750 } else {
2751 // block has changed
2752 upload_block = 1;
2753 }
2754 }
2755
2756 if(upload_block) {
2757 char name[128];
2758 snprintf(name, 128, "%0*d", digits, blockindex);
2759
2760 char *part_href = util_concat_path(res->href, name);
2761 DavResource *part = dav_resource_new_href(res->session, part_href);
2762 free(part_href);
2763
2764 // upload part
2765 dav_set_content_data(part, buffer, r);
2766 if(dav_store(part)) {
2767 *err = 1;
2768 print_resource_error(res->session, part->path);
2769 } else {
2770 // successfully uploaded part
2771
2772 // store the FilePart in a map
2773 // later we do a propfind and add the etag
2774 FilePart *f = calloc(1, sizeof(FilePart));
2775 f->block = blockindex;
2776 f->hash = block_hash;
2777 ucx_map_cstr_put(updated_parts_map, name, f);
2778 }
2779 dav_resource_free(part);
2780 uploaded_parts++;
2781 }
2782 if(*err) {
2783 break;
2784 }
2785 blockindex++;
2786 }
2787 *blockcount = blockindex;
2788
2789 // restore flags
2790 res->session->flags = session_flags;
2791
2792 free(buffer);
2793 if(*err) {
2794 ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free);
2795 ucx_map_free(updated_parts_map);
2796 return NULL;
2797 }
2798
2799 UcxList *updated_parts = NULL;
2800 DavResource *parts = dav_query(res->session, "select D:getetag from %s order by name", res->path);
2801 if(!parts) {
2802 print_resource_error(res->session, parts->path);
2803 *err = 1;
2804 ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free);
2805 ucx_map_free(updated_parts_map);
2806 return NULL;
2807 }
2808 DavResource *part = parts->children;
2809 while(part) {
2810 FilePart *fp = ucx_map_cstr_remove(updated_parts_map, part->name);
2811 // every part we uploaded is in the map
2812 // if we get parts that are not in the map, someone else uploaded it
2813 if(fp) {
2814 char *etag = dav_get_string_property(part, "D:getetag");
2815 if(etag) {
2816 if(strlen(etag) > 2 && etag[0] == 'W' && etag[1] == '/') {
2817 etag = etag + 2;
2818 }
2819
2820 fp->etag = strdup(etag);
2821 updated_parts = ucx_list_append(updated_parts, fp);
2822 } // else { wtf is wrong with this resource }
2823 } else {
2824 uint64_t name_partnum = 0;
2825 char *res_name = part->name;
2826 while(res_name[0] == '0' && res_name[1] != '\0') {
2827 res_name++;
2828 }
2829 DavBool delete_part = 0;
2830 if(strlen(part->name) != digits) {
2831 delete_part = 1;
2832 } else if(util_strtouint(res_name, &name_partnum)) {
2833 if(name_partnum >= blockindex) {
2834 delete_part = 1;
2835 }
2836 }
2837
2838 if(delete_part) {
2839 if(dav_delete(part)) {
2840 print_resource_error(part->session, part->path);
2841 }
2842 }
2843 }
2844 part = part->next;
2845 }
2846 dav_resource_free_all(parts);
2847
2848 ucx_map_free_content(updated_parts_map, (ucx_destructor)filepart_free);
2849 ucx_map_free(updated_parts_map);
2850
2851 *err = 0;
2852 return updated_parts;
2853 }
2854
2855 static void update_parts(LocalResource *local, UcxList *updates, uint64_t numparts) {
2856 size_t old_num = local->numparts;
2857 if(old_num > numparts) {
2858 // free old parts
2859 for(size_t i=numparts;i<old_num;i++) {
2860 FilePart p = local->parts[i];
2861 if(p.etag) {
2862 free(p.etag);
2863 }
2864 if(p.hash) {
2865 free(p.hash);
2866 }
2867 }
2868 }
2869 if(numparts != local->numparts) {
2870 local->parts = realloc(local->parts, numparts * sizeof(FilePart));
2871 local->numparts = numparts;
2872 }
2873
2874 UCX_FOREACH(elm, updates) {
2875 FilePart *p = elm->data;
2876 if(p->block > numparts) {
2877 // just make sure things don't explode in case some weird stuff
2878 // is going on
2879 continue;
2880 }
2881
2882 FilePart *old = &local->parts[p->block];
2883 if(p->block < old_num) {
2884 // cleanup existing part
2885 if(old->hash) {
2886 free(old->hash);
2887 old->hash = NULL;
2888 }
2889 if(old->etag) {
2890 free(old->etag);
2891 old->etag = NULL;
2892 }
2893 }
2894 old->block = p->block;
2895 old->hash = p->hash;
2896 old->etag = p->etag;
2897 free(p);
2898 }
2899 }
2900
2606 int sync_put_resource( 2901 int sync_put_resource(
2607 SyncDirectory *dir, 2902 SyncDirectory *dir,
2608 DavResource *res, 2903 DavResource *res,
2609 LocalResource *local, 2904 LocalResource *local,
2610 int *counter) 2905 int *counter)
2617 perror(""); 2912 perror("");
2618 free(local_path); 2913 free(local_path);
2619 return -1; 2914 return -1;
2620 } 2915 }
2621 2916
2917 size_t split_blocksize = resource_get_blocksize(dir, local, res, s.st_size);
2918
2622 FILE *in = sys_fopen(local_path, "rb"); 2919 FILE *in = sys_fopen(local_path, "rb");
2623 if(!in) { 2920 if(!in) {
2624 fprintf(stderr, "Cannot open file %s\n", local_path); 2921 fprintf(stderr, "Cannot open file %s\n", local_path);
2625 free(local_path); 2922 free(local_path);
2626 return -1; 2923 return -1;
2627 } 2924 }
2628 2925
2629 dav_set_content(res, in, (dav_read_func)myread, (dav_seek_func)file_seek); 2926 DavBool issplit = split_blocksize == 0 ? FALSE : TRUE;
2630 dav_set_content_length(res, s.st_size); 2927 int split_err = 0;
2928 UcxList *parts = NULL;
2929 uint64_t blockcount = 0;
2930 if(!issplit) {
2931 // regular file upload
2932 dav_set_content(res, in, (dav_read_func)myread, (dav_seek_func)file_seek);
2933 dav_set_content_length(res, s.st_size);
2934 } else {
2935 // splitted/partial upload
2936 parts = upload_parts(
2937 local,
2938 res,
2939 in,
2940 s.st_size,
2941 split_blocksize,
2942 &blockcount,
2943 &split_err);
2944 }
2945 if(split_err) {
2946 free(local_path);
2947 return -1;
2948 }
2631 2949
2632 MetadataHashes hashes; 2950 MetadataHashes hashes;
2633 hashes = sync_set_metadata_properties(dir, res->session, res, local); 2951 hashes = sync_set_metadata_properties(dir, res->session, res, local);
2634 2952
2635 // before sync_put_resource, remote_resource_is_changed does a propfind 2953 // before sync_put_resource, remote_resource_is_changed does a propfind
2636 // and sets res->exists 2954 // and sets res->exists
2637 int exists = res->exists; 2955 int exists = res->exists;
2638 if(dir->versioning && dir->versioning->always) { 2956 if(dir->versioning && dir->versioning->always && !issplit) {
2639 int err = versioning_begin(dir, res); 2957 int err = versioning_begin(dir, res);
2640 if(err) { 2958 if(err) {
2641 fprintf(stderr, "Cannot store version for resource: %s\n", res->href); 2959 fprintf(stderr, "Cannot store version for resource: %s\n", res->href);
2642 free(local_path); 2960 free(local_path);
2643 return -1; 2961 return -1;
2655 } 2973 }
2656 ret = 0; 2974 ret = 0;
2657 break; 2975 break;
2658 } 2976 }
2659 2977
2660 if(dir->versioning && dir->versioning->always) { 2978 if(dir->versioning && dir->versioning->always && !issplit) {
2661 if(versioning_end(dir, res)) { 2979 if(versioning_end(dir, res)) {
2662 fprintf(stderr, "Cannot checkin resource\n"); 2980 fprintf(stderr, "Cannot checkin resource\n");
2663 ret = 1; 2981 ret = 1;
2664 } 2982 }
2665 } 2983 }
2666 2984
2667 if(ret == 0) { 2985 if(ret == 0) {
2668 (*counter)++; 2986 (*counter)++;
2669 2987
2670 update_metadata_hashes(local, hashes); 2988 update_metadata_hashes(local, hashes);
2989 update_parts(local, parts, blockcount);
2671 2990
2672 // check contentlength and get new etag 2991 // check contentlength and get new etag
2673 DavResource *up_res = dav_get(res->session, res->path, "D:getetag,idav:status,idav:tags"); 2992 DavResource *up_res = dav_get(res->session, res->path, "D:getetag,idav:status");
2674 2993
2675 if(up_res) { 2994 if(up_res) {
2676 // the new content length must be equal or greater than the file size 2995 // the new content length must be equal or greater than the file size
2677 if(up_res->contentlength < s.st_size) { 2996 if(up_res->contentlength < s.st_size && !issplit) {
2678 fprintf(stderr, "Incomplete Upload: %s\n", local_path); 2997 fprintf(stderr, "Incomplete Upload: %s\n", local_path);
2679 ret = -1; 2998 ret = -1;
2680 // try to set the resource status to 'broken' 2999 // try to set the resource status to 'broken'
2681 sync_set_status(res, "broken"); 3000 sync_set_status(res, "broken");
2682 } else { 3001 } else {

mercurial