source: mediastreamer2/src/msticker.c @ 1344:10d074197822

Last change on this file since 1344:10d074197822 was 1344:10d074197822, checked in by Simon Morlat <simon.morlat@…>, 2 years ago

i18n of mediastreamer2

File size: 11.9 KB
Line 
1/*
2mediastreamer2 library - modular sound and video processing and streaming
3Copyright (C) 2006  Simon MORLAT (simon.morlat@linphone.org)
4
5This program is free software; you can redistribute it and/or
6modify it under the terms of the GNU General Public License
7as published by the Free Software Foundation; either version 2
8of the License, or (at your option) any later version.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
18*/
19
20#include "mediastreamer2/msticker.h"
21
22
23void * ms_ticker_run(void *s);
24static uint64_t get_cur_time(void *);
25
26void ms_ticker_start(MSTicker *s){
27        s->run=TRUE;
28        ms_thread_create(&s->thread,NULL,ms_ticker_run,s);
29}
30
31
32void ms_ticker_init(MSTicker *ticker)
33{
34        ms_mutex_init(&ticker->lock,NULL);
35        ticker->execution_list=NULL;
36        ticker->ticks=1;
37        ticker->time=0;
38        ticker->interval=10;
39        ticker->run=FALSE;
40        ticker->exec_id=0;
41        ticker->get_cur_time_ptr=&get_cur_time;
42        ticker->get_cur_time_data=NULL;
43#ifdef WIN32_TIMERS
44        ticker->TimeEvent=NULL;
45#endif
46        ticker->name=ms_strdup("MSTicker");
47        ms_ticker_start(ticker);
48}
49
50MSTicker *ms_ticker_new(){
51        MSTicker *obj=(MSTicker *)ms_new(MSTicker,1);
52        ms_ticker_init(obj);
53        return obj;
54}
55
56void ms_ticker_stop(MSTicker *s){
57        ms_mutex_lock(&s->lock);
58        s->run=FALSE;
59        ms_mutex_unlock(&s->lock);
60        if(s->thread)
61                ms_thread_join(s->thread,NULL);
62}
63
64void ms_ticker_set_name(MSTicker *s, const char *name){
65        if (s->name) ms_free(s->name);
66        s->name=ms_strdup(name);
67}
68
69void ms_ticker_uninit(MSTicker *ticker)
70{
71        ms_ticker_stop(ticker);
72        ms_free(ticker->name);
73        ms_mutex_destroy(&ticker->lock);
74}
75
76void ms_ticker_destroy(MSTicker *ticker){
77        ms_ticker_uninit(ticker);
78        ms_free(ticker);
79}
80
81
82static MSList *get_sources(MSList *filters){
83        MSList *sources=NULL;
84        MSFilter *f;
85        for(;filters!=NULL;filters=filters->next){
86                f=(MSFilter*)filters->data;
87                if (f->desc->ninputs==0){
88                        sources=ms_list_append(sources,f);
89                }
90        }
91        return sources;
92}
93
94int ms_ticker_attach(MSTicker *ticker,MSFilter *f)
95{
96        MSList *sources=NULL;
97        MSList *filters=NULL;
98        MSList *it;
99       
100        if (f->ticker!=NULL) {
101                ms_message("Filter %s is already being scheduled; nothing to do.",f->desc->name);
102                return 0;
103        }
104
105        filters=ms_filter_find_neighbours(f);
106        sources=get_sources(filters);
107        if (sources==NULL){
108                ms_fatal("No sources found around filter %s",f->desc->name);
109                ms_list_free(filters);
110                return -1;
111        }
112        /*run preprocess on each filter: */
113        for(it=filters;it!=NULL;it=it->next)
114                ms_filter_preprocess((MSFilter*)it->data,ticker);
115        ms_mutex_lock(&ticker->lock);
116        ticker->execution_list=ms_list_concat(ticker->execution_list,sources);
117        ms_mutex_unlock(&ticker->lock);
118        ms_list_free(filters);
119        return 0;
120}
121
122
123
124int ms_ticker_detach(MSTicker *ticker,MSFilter *f){
125        MSList *sources=NULL;
126        MSList *filters=NULL;
127        MSList *it;
128
129        if (f->ticker==NULL) {
130                ms_message("Filter %s is not scheduled; nothing to do.",f->desc->name);
131                return 0;
132        }
133
134        ms_mutex_lock(&ticker->lock);
135
136        filters=ms_filter_find_neighbours(f);
137        sources=get_sources(filters);
138        if (sources==NULL){
139                ms_fatal("No sources found around filter %s",f->desc->name);
140                ms_list_free(filters);
141                ms_mutex_unlock(&ticker->lock);
142                return -1;
143        }
144
145        for(it=sources;it!=NULL;it=ms_list_next(it)){
146                ticker->execution_list=ms_list_remove(ticker->execution_list,it->data);
147        }
148        ms_mutex_unlock(&ticker->lock);
149        ms_list_for_each(filters,(void (*)(void*))ms_filter_postprocess);
150        ms_list_free(filters);
151        ms_list_free(sources);
152        return 0;
153}
154
155
156static bool_t filter_can_process(MSFilter *f, int tick){
157        /* look if filters before this one have run */
158        int i;
159        MSQueue *l;
160        for(i=0;i<f->desc->ninputs;i++){
161                l=f->inputs[i];
162                if (l!=NULL){
163                        if (l->prev.filter->last_tick!=tick) return FALSE;
164                }
165        }
166        return TRUE;
167}
168
169static void call_process(MSFilter *f){
170        bool_t process_done=FALSE;
171        if (f->desc->ninputs==0 || f->desc->flags & MS_FILTER_IS_PUMP){
172                ms_filter_process(f);
173        }else{
174                while (ms_filter_inputs_have_data(f)) {
175                        if (process_done){
176                                ms_warning("Re-scheduling filter %s: all data should be consumed in one process call, so fix it.",f->desc->name);
177                        }
178                        ms_filter_process(f);
179                        process_done=TRUE;
180                }
181        }
182}
183
184static void run_graph(MSFilter *f, MSTicker *s, MSList **unschedulable, bool_t force_schedule){
185        int i;
186        MSQueue *l;
187        if (f->last_tick!=s->ticks ){
188                if (filter_can_process(f,s->ticks) || force_schedule) {
189                        /* this is a candidate */
190                        f->last_tick=s->ticks;
191                        call_process(f);       
192                        /* now recurse to next filters */               
193                        for(i=0;i<f->desc->noutputs;i++){
194                                l=f->outputs[i];
195                                if (l!=NULL){
196                                        run_graph(l->next.filter,s,unschedulable, force_schedule);
197                                }
198                        }
199                }else{
200                        /* this filter has not all inputs that have been filled by filters before it. */
201                        *unschedulable=ms_list_prepend(*unschedulable,f);
202                }
203        }
204}
205
206static void run_graphs(MSTicker *s, MSList *execution_list, bool_t force_schedule){
207        MSList *it;
208        MSList *unschedulable=NULL;
209        for(it=execution_list;it!=NULL;it=it->next){
210                run_graph((MSFilter*)it->data,s,&unschedulable,force_schedule);
211        }
212        /* filters that are part of a loop haven't been called in process() because one of their input refers to a filter that could not be scheduled (because they could not be scheduled themselves)... Do you understand ?*/
213        /* we resolve this by simply assuming that they must be called anyway
214        for the loop to run correctly*/
215        /* we just recall run_graphs on them, as if they were source filters */
216        if (unschedulable!=NULL) {
217                run_graphs(s,unschedulable,TRUE);
218                ms_list_free(unschedulable);
219        }
220}
221
222#ifdef __MACH__
223#include <sys/types.h>
224#include <sys/timeb.h>
225#endif
226
227static uint64_t get_cur_time(void *unused){
228#if defined(_WIN32_WCE)
229        DWORD timemillis = GetTickCount();
230        return timemillis;
231#elif defined(WIN32)
232        return timeGetTime() ;
233#elif defined(__MACH__) && defined(__GNUC__) && (__GNUC__ >= 3)
234        struct timeval tv;
235        gettimeofday(&tv, NULL);
236        return (tv.tv_sec*1000LL) + ((tv.tv_usec+500LL)/1000LL);
237#elif defined(__MACH__)
238        struct timespec ts;
239        struct timeb time_val;
240       
241        ftime (&time_val);
242        ts.tv_sec = time_val.time;
243        ts.tv_nsec = time_val.millitm * 1000000;
244        return (ts.tv_sec*1000LL) + ((ts.tv_nsec+500000LL)/1000000LL);
245#else
246        struct timespec ts;
247        if (clock_gettime(CLOCK_MONOTONIC,&ts)<0){
248                ms_fatal("clock_gettime() doesn't work: %s",strerror(errno));
249        }
250        return (ts.tv_sec*1000LL) + ((ts.tv_nsec+500000LL)/1000000LL);
251#endif
252}
253
254static void sleepMs(int ms){
255#ifdef WIN32
256        Sleep(ms);
257#else
258        struct timespec ts;
259        ts.tv_sec=0;
260        ts.tv_nsec=ms*1000000LL;
261        nanosleep(&ts,NULL);
262#endif
263}
264
265static int set_high_prio(void){
266        int precision=2;
267        int result=0;
268#ifdef WIN32
269        MMRESULT mm;
270        TIMECAPS ptc;
271        mm=timeGetDevCaps(&ptc,sizeof(ptc));
272        if (mm==0){
273                if (ptc.wPeriodMin<(UINT)precision)
274                        ptc.wPeriodMin=precision;
275                else
276                        precision = ptc.wPeriodMin;
277                mm=timeBeginPeriod(ptc.wPeriodMin);
278                if (mm!=TIMERR_NOERROR){
279                        ms_warning("timeBeginPeriod failed.");
280                }
281                ms_message("win32 timer resolution set to %i ms",ptc.wPeriodMin);
282        }else{
283                ms_warning("timeGetDevCaps failed.");
284        }
285
286        if(!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST)){
287                ms_warning("SetThreadPriority() failed (%d)\n", GetLastError());
288        }
289#else
290        struct sched_param param;
291        memset(&param,0,sizeof(param));
292#ifdef TARGET_OS_MAC
293        int policy=SCHED_RR;
294#else
295        int policy=SCHED_OTHER;
296#endif
297        param.sched_priority=sched_get_priority_max(policy);
298        if((result=pthread_setschedparam(pthread_self(),policy, &param))) {
299                ms_warning("Set sched param failed with error code(%i)\n",result);
300        } else {
301                ms_message("MS ticker priority set to max (%i)",param.sched_priority);
302        }
303#endif
304        return precision;
305}
306
307static void unset_high_prio(int precision){
308#ifdef WIN32
309        MMRESULT mm;
310
311        if(!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_NORMAL)){
312                ms_warning("SetThreadPriority() failed (%d)\n", GetLastError());
313        }
314
315        mm=timeEndPeriod(precision);
316#endif
317}
318
319#ifndef WIN32_TIMERS
320
321void * ms_ticker_run(void *arg)
322{
323        uint64_t realtime;
324        int64_t diff;
325        MSTicker *s=(MSTicker*)arg;
326        int lastlate=0;
327        int precision=2;
328        int late;
329       
330        precision = set_high_prio();
331
332
333        s->ticks=1;
334        ms_mutex_lock(&s->lock);
335        s->orig=s->get_cur_time_ptr(s->get_cur_time_data);
336
337        while(s->run){
338                s->ticks++;
339                run_graphs(s,s->execution_list,FALSE);
340                s->time+=s->interval;
341                while(1){
342                        realtime=s->get_cur_time_ptr(s->get_cur_time_data)-s->orig;
343                        ms_mutex_unlock(&s->lock);
344                        diff=s->time-realtime;
345                        if (diff>0){
346                                /* sleep until next tick */
347                                sleepMs((int)diff);
348                        }else{
349                                late=(int)-diff;
350                                if (late>s->interval*5 && late>lastlate){
351                                        ms_warning("%s: We are late of %d miliseconds.",s->name,late);
352                                }
353                                lastlate=late;
354                                break; /*exit the while loop */
355                        }
356                        ms_mutex_lock(&s->lock);
357                }
358                ms_mutex_lock(&s->lock);
359        }
360        ms_mutex_unlock(&s->lock);
361        unset_high_prio(precision);
362        ms_message("%s thread exiting",s->name);
363
364        ms_thread_exit(NULL);
365        return NULL;
366}
367
368#else
369
370void * ms_ticker_run(void *arg)
371{
372        MSTicker *s=(MSTicker*)arg;
373        uint64_t realtime;
374        int precision=2;
375        UINT timerId;
376
377        precision = set_high_prio();
378
379        s->TimeEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
380
381        s->ticks=1;
382        ms_mutex_lock(&s->lock);
383        s->orig=s->get_cur_time_ptr(s->get_cur_time_data);
384
385        timerId = timeSetEvent (s->interval, precision, (LPTIMECALLBACK)s->TimeEvent, 0,
386                                  TIME_PERIODIC | TIME_CALLBACK_EVENT_SET);
387        while(s->run){
388                DWORD err;
389
390                s->ticks++;
391                run_graphs(s,s->execution_list,FALSE);
392
393                /* elapsed time since origin */
394                s->time = s->get_cur_time_ptr(s->get_cur_time_data)- s->orig;
395
396                ms_mutex_unlock(&s->lock);
397                err = WaitForSingleObject (s->TimeEvent, s->interval*1000 ); /* wake up each diff */
398                if (err==WAIT_FAILED)
399                        ms_message("WaitForSingleObject is failing");
400
401                ms_mutex_lock(&s->lock);
402        }
403        ms_mutex_unlock(&s->lock);
404        timeKillEvent (timerId);
405        CloseHandle (s->TimeEvent);
406        s->TimeEvent=NULL;
407        unset_high_prio(precision);
408        ms_message("MSTicker thread exiting");
409        ms_thread_exit(NULL);
410        return NULL;
411}
412
413#endif
414
415void ms_ticker_set_time_func(MSTicker *ticker, MSTickerTimeFunc func, void *user_data){
416        if (func==NULL) func=get_cur_time;
417        /*ms_mutex_lock(&ticker->lock);*/
418        ticker->get_cur_time_ptr=func;
419        ticker->get_cur_time_data=user_data;
420        /*re-set the origin to take in account that previous function ptr and the
421        new one may return different times*/
422        ticker->orig=func(user_data)-ticker->time;
423        /*ms_mutex_unlock(&ticker->lock);*/
424        ms_message("ms_ticker_set_time_func: ticker updated.");
425}
426
427static void print_graph(MSFilter *f, MSTicker *s, MSList **unschedulable, bool_t force_schedule){
428        int i;
429        MSQueue *l;
430        if (f->last_tick!=s->ticks ){
431                if (filter_can_process(f,s->ticks) || force_schedule) {
432                        /* this is a candidate */
433                        f->last_tick=s->ticks;
434                        ms_message("print_graphs: %s", f->desc->name);
435                        /* now recurse to next filters */               
436                        for(i=0;i<f->desc->noutputs;i++){
437                                l=f->outputs[i];
438                                if (l!=NULL){
439                                        print_graph(l->next.filter,s,unschedulable, force_schedule);
440                                }
441                        }
442                }else{
443                        /* this filter has not all inputs that have been filled by filters before it. */
444                        *unschedulable=ms_list_prepend(*unschedulable,f);
445                }
446        }
447}
448
449static void print_graphs(MSTicker *s, MSList *execution_list, bool_t force_schedule){
450        MSList *it;
451        MSList *unschedulable=NULL;
452        for(it=execution_list;it!=NULL;it=it->next){
453                print_graph((MSFilter*)it->data,s,&unschedulable,force_schedule);
454        }
455        /* filters that are part of a loop haven't been called in process() because one of their input refers to a filter that could not be scheduled (because they could not be scheduled themselves)... Do you understand ?*/
456        /* we resolve this by simply assuming that they must be called anyway
457        for the loop to run correctly*/
458        /* we just recall run_graphs on them, as if they were source filters */
459        if (unschedulable!=NULL) {
460                print_graphs(s,unschedulable,TRUE);
461                ms_list_free(unschedulable);
462        }
463}
464
465void ms_ticker_print_graphs(MSTicker *ticker){
466        print_graphs(ticker,ticker->execution_list,FALSE);
467}
Note: See TracBrowser for help on using the repository browser.