source: mediastreamer2/src/msticker.c @ 1343:dbe89bf88eca

Last change on this file since 1343:dbe89bf88eca was 1305:2887e2cf936b, checked in by Simon Morlat <simon.morlat@…>, 2 years ago

msticker better time resolution

File size: 11.9 KB
Line 
1/*
2mediastreamer2 library - modular sound and video processing and streaming
3Copyright (C) 2006  Simon MORLAT (simon.morlat@linphone.org)
4
5This program is free software; you can redistribute it and/or
6modify it under the terms of the GNU General Public License
7as published by the Free Software Foundation; either version 2
8of the License, or (at your option) any later version.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
18*/
19
20
21
22#include "mediastreamer2/msticker.h"
23
24
25void * ms_ticker_run(void *s);
26static uint64_t get_cur_time(void *);
27
28void ms_ticker_start(MSTicker *s){
29        s->run=TRUE;
30        ms_thread_create(&s->thread,NULL,ms_ticker_run,s);
31}
32
33
34void ms_ticker_init(MSTicker *ticker)
35{
36        ms_mutex_init(&ticker->lock,NULL);
37        ticker->execution_list=NULL;
38        ticker->ticks=1;
39        ticker->time=0;
40        ticker->interval=10;
41        ticker->run=FALSE;
42        ticker->exec_id=0;
43        ticker->get_cur_time_ptr=&get_cur_time;
44        ticker->get_cur_time_data=NULL;
45#ifdef WIN32_TIMERS
46        ticker->TimeEvent=NULL;
47#endif
48        ticker->name=ms_strdup("MSTicker");
49        ms_ticker_start(ticker);
50}
51
52MSTicker *ms_ticker_new(){
53        MSTicker *obj=(MSTicker *)ms_new(MSTicker,1);
54        ms_ticker_init(obj);
55        return obj;
56}
57
58void ms_ticker_stop(MSTicker *s){
59        ms_mutex_lock(&s->lock);
60        s->run=FALSE;
61        ms_mutex_unlock(&s->lock);
62        if(s->thread)
63                ms_thread_join(s->thread,NULL);
64}
65
66void ms_ticker_set_name(MSTicker *s, const char *name){
67        if (s->name) ms_free(s->name);
68        s->name=ms_strdup(name);
69}
70
71void ms_ticker_uninit(MSTicker *ticker)
72{
73        ms_ticker_stop(ticker);
74        ms_free(ticker->name);
75        ms_mutex_destroy(&ticker->lock);
76}
77
78void ms_ticker_destroy(MSTicker *ticker){
79        ms_ticker_uninit(ticker);
80        ms_free(ticker);
81}
82
83
84static MSList *get_sources(MSList *filters){
85        MSList *sources=NULL;
86        MSFilter *f;
87        for(;filters!=NULL;filters=filters->next){
88                f=(MSFilter*)filters->data;
89                if (f->desc->ninputs==0){
90                        sources=ms_list_append(sources,f);
91                }
92        }
93        return sources;
94}
95
96int ms_ticker_attach(MSTicker *ticker,MSFilter *f)
97{
98        MSList *sources=NULL;
99        MSList *filters=NULL;
100        MSList *it;
101       
102        if (f->ticker!=NULL) {
103                ms_message("Filter %s is already being scheduled; nothing to do.",f->desc->name);
104                return 0;
105        }
106
107        filters=ms_filter_find_neighbours(f);
108        sources=get_sources(filters);
109        if (sources==NULL){
110                ms_fatal("No sources found around filter %s",f->desc->name);
111                ms_list_free(filters);
112                return -1;
113        }
114        /*run preprocess on each filter: */
115        for(it=filters;it!=NULL;it=it->next)
116                ms_filter_preprocess((MSFilter*)it->data,ticker);
117        ms_mutex_lock(&ticker->lock);
118        ticker->execution_list=ms_list_concat(ticker->execution_list,sources);
119        ms_mutex_unlock(&ticker->lock);
120        ms_list_free(filters);
121        return 0;
122}
123
124
125
126int ms_ticker_detach(MSTicker *ticker,MSFilter *f){
127        MSList *sources=NULL;
128        MSList *filters=NULL;
129        MSList *it;
130
131        if (f->ticker==NULL) {
132                ms_message("Filter %s is not scheduled; nothing to do.",f->desc->name);
133                return 0;
134        }
135
136        ms_mutex_lock(&ticker->lock);
137
138        filters=ms_filter_find_neighbours(f);
139        sources=get_sources(filters);
140        if (sources==NULL){
141                ms_fatal("No sources found around filter %s",f->desc->name);
142                ms_list_free(filters);
143                ms_mutex_unlock(&ticker->lock);
144                return -1;
145        }
146
147        for(it=sources;it!=NULL;it=ms_list_next(it)){
148                ticker->execution_list=ms_list_remove(ticker->execution_list,it->data);
149        }
150        ms_mutex_unlock(&ticker->lock);
151        ms_list_for_each(filters,(void (*)(void*))ms_filter_postprocess);
152        ms_list_free(filters);
153        ms_list_free(sources);
154        return 0;
155}
156
157
158static bool_t filter_can_process(MSFilter *f, int tick){
159        /* look if filters before this one have run */
160        int i;
161        MSQueue *l;
162        for(i=0;i<f->desc->ninputs;i++){
163                l=f->inputs[i];
164                if (l!=NULL){
165                        if (l->prev.filter->last_tick!=tick) return FALSE;
166                }
167        }
168        return TRUE;
169}
170
171static void call_process(MSFilter *f){
172        bool_t process_done=FALSE;
173        if (f->desc->ninputs==0 || f->desc->flags & MS_FILTER_IS_PUMP){
174                ms_filter_process(f);
175        }else{
176                while (ms_filter_inputs_have_data(f)) {
177                        if (process_done){
178                                ms_warning("Re-scheduling filter %s: all data should be consumed in one process call, so fix it.",f->desc->name);
179                        }
180                        ms_filter_process(f);
181                        process_done=TRUE;
182                }
183        }
184}
185
186static void run_graph(MSFilter *f, MSTicker *s, MSList **unschedulable, bool_t force_schedule){
187        int i;
188        MSQueue *l;
189        if (f->last_tick!=s->ticks ){
190                if (filter_can_process(f,s->ticks) || force_schedule) {
191                        /* this is a candidate */
192                        f->last_tick=s->ticks;
193                        call_process(f);       
194                        /* now recurse to next filters */               
195                        for(i=0;i<f->desc->noutputs;i++){
196                                l=f->outputs[i];
197                                if (l!=NULL){
198                                        run_graph(l->next.filter,s,unschedulable, force_schedule);
199                                }
200                        }
201                }else{
202                        /* this filter has not all inputs that have been filled by filters before it. */
203                        *unschedulable=ms_list_prepend(*unschedulable,f);
204                }
205        }
206}
207
208static void run_graphs(MSTicker *s, MSList *execution_list, bool_t force_schedule){
209        MSList *it;
210        MSList *unschedulable=NULL;
211        for(it=execution_list;it!=NULL;it=it->next){
212                run_graph((MSFilter*)it->data,s,&unschedulable,force_schedule);
213        }
214        /* filters that are part of a loop haven't been called in process() because one of their input refers to a filter that could not be scheduled (because they could not be scheduled themselves)... Do you understand ?*/
215        /* we resolve this by simply assuming that they must be called anyway
216        for the loop to run correctly*/
217        /* we just recall run_graphs on them, as if they were source filters */
218        if (unschedulable!=NULL) {
219                run_graphs(s,unschedulable,TRUE);
220                ms_list_free(unschedulable);
221        }
222}
223
224#ifdef __MACH__
225#include <sys/types.h>
226#include <sys/timeb.h>
227#endif
228
229static uint64_t get_cur_time(void *unused){
230#if defined(_WIN32_WCE)
231        DWORD timemillis = GetTickCount();
232        return timemillis;
233#elif defined(WIN32)
234        return timeGetTime() ;
235#elif defined(__MACH__) && defined(__GNUC__) && (__GNUC__ >= 3)
236        struct timeval tv;
237        gettimeofday(&tv, NULL);
238        return (tv.tv_sec*1000LL) + ((tv.tv_usec+500LL)/1000LL);
239#elif defined(__MACH__)
240        struct timespec ts;
241        struct timeb time_val;
242       
243        ftime (&time_val);
244        ts.tv_sec = time_val.time;
245        ts.tv_nsec = time_val.millitm * 1000000;
246        return (ts.tv_sec*1000LL) + ((ts.tv_nsec+500000LL)/1000000LL);
247#else
248        struct timespec ts;
249        if (clock_gettime(CLOCK_MONOTONIC,&ts)<0){
250                ms_fatal("clock_gettime() doesn't work: %s",strerror(errno));
251        }
252        return (ts.tv_sec*1000LL) + ((ts.tv_nsec+500000LL)/1000000LL);
253#endif
254}
255
256static void sleepMs(int ms){
257#ifdef WIN32
258        Sleep(ms);
259#else
260        struct timespec ts;
261        ts.tv_sec=0;
262        ts.tv_nsec=ms*1000000LL;
263        nanosleep(&ts,NULL);
264#endif
265}
266
267static int set_high_prio(void){
268        int precision=2;
269        int result=0;
270#ifdef WIN32
271        MMRESULT mm;
272        TIMECAPS ptc;
273        mm=timeGetDevCaps(&ptc,sizeof(ptc));
274        if (mm==0){
275                if (ptc.wPeriodMin<(UINT)precision)
276                        ptc.wPeriodMin=precision;
277                else
278                        precision = ptc.wPeriodMin;
279                mm=timeBeginPeriod(ptc.wPeriodMin);
280                if (mm!=TIMERR_NOERROR){
281                        ms_warning("timeBeginPeriod failed.");
282                }
283                ms_message("win32 timer resolution set to %i ms",ptc.wPeriodMin);
284        }else{
285                ms_warning("timeGetDevCaps failed.");
286        }
287
288        if(!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST)){
289                ms_warning("SetThreadPriority() failed (%d)\n", GetLastError());
290        }
291#else
292        struct sched_param param;
293        memset(&param,0,sizeof(param));
294#ifdef TARGET_OS_MAC
295        int policy=SCHED_RR;
296#else
297        int policy=SCHED_OTHER;
298#endif
299        param.sched_priority=sched_get_priority_max(policy);
300        if((result=pthread_setschedparam(pthread_self(),policy, &param))) {
301                ms_warning("Set sched param failed with error code(%i)\n",result);
302        } else {
303                ms_message("MS ticker priority set to max (%i)",param.sched_priority);
304        }
305#endif
306        return precision;
307}
308
309static void unset_high_prio(int precision){
310#ifdef WIN32
311        MMRESULT mm;
312
313        if(!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_NORMAL)){
314                ms_warning("SetThreadPriority() failed (%d)\n", GetLastError());
315        }
316
317        mm=timeEndPeriod(precision);
318#endif
319}
320
321#ifndef WIN32_TIMERS
322
323void * ms_ticker_run(void *arg)
324{
325        uint64_t realtime;
326        int64_t diff;
327        MSTicker *s=(MSTicker*)arg;
328        int lastlate=0;
329        int precision=2;
330        int late;
331       
332        precision = set_high_prio();
333
334
335        s->ticks=1;
336        ms_mutex_lock(&s->lock);
337        s->orig=s->get_cur_time_ptr(s->get_cur_time_data);
338
339        while(s->run){
340                s->ticks++;
341                run_graphs(s,s->execution_list,FALSE);
342                s->time+=s->interval;
343                while(1){
344                        realtime=s->get_cur_time_ptr(s->get_cur_time_data)-s->orig;
345                        ms_mutex_unlock(&s->lock);
346                        diff=s->time-realtime;
347                        if (diff>0){
348                                /* sleep until next tick */
349                                sleepMs((int)diff);
350                        }else{
351                                late=(int)-diff;
352                                if (late>s->interval*5 && late>lastlate){
353                                        ms_warning("%s: We are late of %d miliseconds.",s->name,late);
354                                }
355                                lastlate=late;
356                                break; /*exit the while loop */
357                        }
358                        ms_mutex_lock(&s->lock);
359                }
360                ms_mutex_lock(&s->lock);
361        }
362        ms_mutex_unlock(&s->lock);
363        unset_high_prio(precision);
364        ms_message("%s thread exiting",s->name);
365
366        ms_thread_exit(NULL);
367        return NULL;
368}
369
370#else
371
372void * ms_ticker_run(void *arg)
373{
374        MSTicker *s=(MSTicker*)arg;
375        uint64_t realtime;
376        int precision=2;
377        UINT timerId;
378
379        precision = set_high_prio();
380
381        s->TimeEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
382
383        s->ticks=1;
384        ms_mutex_lock(&s->lock);
385        s->orig=s->get_cur_time_ptr(s->get_cur_time_data);
386
387        timerId = timeSetEvent (s->interval, precision, (LPTIMECALLBACK)s->TimeEvent, 0,
388                                  TIME_PERIODIC | TIME_CALLBACK_EVENT_SET);
389        while(s->run){
390                DWORD err;
391
392                s->ticks++;
393                run_graphs(s,s->execution_list,FALSE);
394
395                /* elapsed time since origin */
396                s->time = s->get_cur_time_ptr(s->get_cur_time_data)- s->orig;
397
398                ms_mutex_unlock(&s->lock);
399                err = WaitForSingleObject (s->TimeEvent, s->interval*1000 ); /* wake up each diff */
400                if (err==WAIT_FAILED)
401                        ms_message("WaitForSingleObject is failing");
402
403                ms_mutex_lock(&s->lock);
404        }
405        ms_mutex_unlock(&s->lock);
406        timeKillEvent (timerId);
407        CloseHandle (s->TimeEvent);
408        s->TimeEvent=NULL;
409        unset_high_prio(precision);
410        ms_message("MSTicker thread exiting");
411        ms_thread_exit(NULL);
412        return NULL;
413}
414
415#endif
416
417void ms_ticker_set_time_func(MSTicker *ticker, MSTickerTimeFunc func, void *user_data){
418        if (func==NULL) func=get_cur_time;
419        /*ms_mutex_lock(&ticker->lock);*/
420        ticker->get_cur_time_ptr=func;
421        ticker->get_cur_time_data=user_data;
422        /*re-set the origin to take in account that previous function ptr and the
423        new one may return different times*/
424        ticker->orig=func(user_data)-ticker->time;
425        /*ms_mutex_unlock(&ticker->lock);*/
426        ms_message("ms_ticker_set_time_func: ticker updated.");
427}
428
429static void print_graph(MSFilter *f, MSTicker *s, MSList **unschedulable, bool_t force_schedule){
430        int i;
431        MSQueue *l;
432        if (f->last_tick!=s->ticks ){
433                if (filter_can_process(f,s->ticks) || force_schedule) {
434                        /* this is a candidate */
435                        f->last_tick=s->ticks;
436                        ms_message("print_graphs: %s", f->desc->name);
437                        /* now recurse to next filters */               
438                        for(i=0;i<f->desc->noutputs;i++){
439                                l=f->outputs[i];
440                                if (l!=NULL){
441                                        print_graph(l->next.filter,s,unschedulable, force_schedule);
442                                }
443                        }
444                }else{
445                        /* this filter has not all inputs that have been filled by filters before it. */
446                        *unschedulable=ms_list_prepend(*unschedulable,f);
447                }
448        }
449}
450
451static void print_graphs(MSTicker *s, MSList *execution_list, bool_t force_schedule){
452        MSList *it;
453        MSList *unschedulable=NULL;
454        for(it=execution_list;it!=NULL;it=it->next){
455                print_graph((MSFilter*)it->data,s,&unschedulable,force_schedule);
456        }
457        /* filters that are part of a loop haven't been called in process() because one of their input refers to a filter that could not be scheduled (because they could not be scheduled themselves)... Do you understand ?*/
458        /* we resolve this by simply assuming that they must be called anyway
459        for the loop to run correctly*/
460        /* we just recall run_graphs on them, as if they were source filters */
461        if (unschedulable!=NULL) {
462                print_graphs(s,unschedulable,TRUE);
463                ms_list_free(unschedulable);
464        }
465}
466
467void ms_ticker_print_graphs(MSTicker *ticker){
468        print_graphs(ticker,ticker->execution_list,FALSE);
469}
Note: See TracBrowser for help on using the repository browser.