Sofian Hadiwijaya

Programming is a creative activity.

DIY - Kerekan Bendera Dikontrol via Twitter

| Comments

4 Agustus 2015, melalui pembicaraan singkat team Code4nation pada platform slack, salah satu member dari Techinasia mengemukakan ide dari CTO mereka tentang campaign acara Hackathon Merdeka yaitu membuat kerekan bendera yang dikontrol via twitter, jadi ketika ada tweet dengan hashtag tertentu maka si bendera akan dengan sendirinya naik perlahan. Hal ini disambut positif oleh mas Ainun Najib (penggagas kawalpemilu dan laporpresiden). “@sofianhw bisa upayakan IoT (Internet of Things) -nya?” ucap mas Ainun Najib. Seketika itu juga saya jawab “Bisa!!”.

Diselah-selah kesibukan mempersiapkan event Tresemme pada acara Market Museum. Saya mencoba menyiapkan bahan, pertama yang terngiang dipikiran saya yaitu stepper motor agar pegerakannya lebih persisi. Dan untuk katrolnya sendiri saya akan menggunakan timing belt printer.

Pencarianpun dimulai, dari online maupun offline. Timing belt printer mayoritas pendek, saya membutuhkan minimal satu meter. Akhirnya nemu timing belt 3d printer di tokopedia panjangnya pas banget satu meter.

Pada tanggal 13 Agustus 2015, timing belt 3d printer yang saya pesan tiba di tempat tinggal saya. Tapi saat itu saya lagi sibuk-sibuknya loading barang untuk event. Akhirnya tanggal 16 Agustus 2015, barulah saya punya kesempatan untuk menyelesaikan projek ini. Berikut barang-barang yang saya gunakan :

  • Intel Edison
  • ULN2803a
  • Stepper Motor
  • Timing belt 3d printer
  • Dudukan untuk kamera
  • Gagang Sapu
  • Kabel Tie

Pertama-tama saya akan rakit tiang benderanya dulu. Ikatkan dudukan kamera dan stepper motor pada gagang sapu menggunakan kabel tie.

pastikan timing belt tidak terlalu kencang maupun kendor.

Setelah itu marilah kita rakit rangkaian ULN dan intel edison sebagai berikut :

Mari kita sambungkan keseluruhan system

Selanjutnya mari kita putar-putar kerekannya. Berikut kode yang saya tulis menggunakan bahasa python.

countFlagSteps.pylink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import time
from wiringx86 import GPIOEdison as GPIO
gpio = GPIO(debug=False)
pin1 = 3
pin2 = 4
pin3 = 5
pin4 = 6
index = 0

print 'Setting up pin %d' % pin1
gpio.pinMode(pin1, gpio.OUTPUT)
print 'Setting up pin %d' % pin2
gpio.pinMode(pin2, gpio.OUTPUT)
print 'Setting up pin %d' % pin3
gpio.pinMode(pin3, gpio.OUTPUT)
print 'Setting up pin %d' % pin4
gpio.pinMode(pin4, gpio.OUTPUT)

print 'Go up now...'
try:
  while(True):
    gpio.digitalWrite(pin4, gpio.LOW)
    gpio.digitalWrite(pin2, gpio.HIGH)
    time.sleep(0.01)

    gpio.digitalWrite(pin1, gpio.LOW)
    gpio.digitalWrite(pin3, gpio.HIGH)
    time.sleep(0.01)

    gpio.digitalWrite(pin2, gpio.LOW)
    gpio.digitalWrite(pin4, gpio.HIGH)
    time.sleep(0.01)

    gpio.digitalWrite(pin3, gpio.LOW)
    gpio.digitalWrite(pin1, gpio.HIGH)
    time.sleep(0.01)

    index=index+1

# When you get tired of seeing the led blinking kill the loop with Ctrl-C.
except KeyboardInterrupt:
  print '\nCleaning up...'
  print 'total step %d' % index
  gpio.digitalWrite(pin1, gpio.LOW)
  gpio.digitalWrite(pin2, gpio.LOW)
  gpio.digitalWrite(pin3, gpio.LOW)
  gpio.digitalWrite(pin4, gpio.LOW)

  # Do a general cleanup. Calling this function is not mandatory.       
  gpio.cleanup()

eksekusi kode tersebut, ketika bendera sudah mencapai puncaknya tekan “Ctrl + C” untuk interrupt program, lalu program akan memberikan output berapa putaran yang diperlukan untuk mencapai puncak.

Terminal
1
$ python countFlagSteps.py

Ketika semua step dianggap sukses sekarang marilah kita hubungkan ke twitter. sebelum memulai ada baiknya kita membuat aplikasi pada twitter kita.

Kita akan memanfaatkan API Streaming untuk mendapatkan jumlah tweet secara real-time.

countTweets.pylink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import argparse
import time
from wiringx86 import GPIOEdison as GPIO
from TwitterAPI import TwitterAPI, TwitterOAuth, TwitterRestPager
gpio = GPIO(debug=False)
pin1 = 3
pin2 = 4
pin3 = 5
pin4 = 6
COUNT = 100 # search download batch size

def setup():
        print 'Setting up pin %d' % pin1
        gpio.pinMode(pin1, gpio.OUTPUT)
        print 'Setting up pin %d' % pin2
        gpio.pinMode(pin2, gpio.OUTPUT)
        print 'Setting up pin %d' % pin3
        gpio.pinMode(pin3, gpio.OUTPUT)
        print 'Setting up pin %d' % pin4
        gpio.pinMode(pin4, gpio.OUTPUT)

def puter():
        print "muter"
        for i in range(100):
                print i
                gpio.digitalWrite(pin4, gpio.LOW)
                gpio.digitalWrite(pin2, gpio.HIGH)
                time.sleep(0.01)

                gpio.digitalWrite(pin1, gpio.LOW)
                gpio.digitalWrite(pin3, gpio.HIGH)
                time.sleep(0.01)

                gpio.digitalWrite(pin2, gpio.LOW)
                gpio.digitalWrite(pin4, gpio.HIGH)
                time.sleep(0.01)

                gpio.digitalWrite(pin3, gpio.LOW)
                gpio.digitalWrite(pin1, gpio.HIGH)
                time.sleep(0.01)

def count_old_tweets(api, word_list):
        words = ' OR '.join(word_list)
        count = 0
        while True:
                pager = TwitterRestPager(api, 'search/tweets', {'q':words, 'coun
                for item in pager.get_iterator():
                        if 'text' in item:
                                count += 1
                                print(count)
                        elif 'message' in item:
                                if item['code'] == 131:
                                        continue # ignore internal server error 
                                elif item['code'] == 88:
                                        print('Suspend search until %s' % search
                                raise Exception('Message from twitter: %s' % ite


def count_new_tweets(api, word_list):
        words = ','.join(word_list)
        count = 0
        total_skip = 0
        while True:
                skip = 0
                try:
                        r = api.request('statuses/filter', {'track':words})
                        while True:
                                for item in r.get_iterator():
                                        if 'text' in item:
                                                count += 1
                                                puter()
                                                print(count + skip + total_skip)
                                        elif 'limit' in item:
                                                skip = item['limit'].get('track'
                                                #print('\n\n\n*** Skipping %d tw
                                        elif 'disconnect' in item:
                                                raise Exception('Disconnect: %s'
                except Exception as e:
                        print('*** MUST RECONNECT %s' % e)
                total_skip += skip


if __name__ == '__main__':
        parser = argparse.ArgumentParser(description='Count occurance of word(s)
        parser.add_argument('-past', action='store_true', help='search historic 
        parser.add_argument('-oauth', metavar='FILENAME', type=str, help='read O
        parser.add_argument('words', metavar='W', type=str, nargs='+', help='wor
        args = parser.parse_args()

        oauth = TwitterOAuth.read_file(args.oauth)
        api = TwitterAPI(oauth.consumer_key, oauth.consumer_secret, oauth.access

        try:
                setup()
                if args.past:
                        count_old_tweets(api, args.words)
                else:
                        count_new_tweets(api, args.words)
        except KeyboardInterrupt:
                print('\nTerminated by user\n')
                gpio.digitalWrite(pin1, gpio.LOW)
                gpio.digitalWrite(pin2, gpio.LOW)
                gpio.digitalWrite(pin3, gpio.LOW)
                gpio.digitalWrite(pin4, gpio.LOW)

                # Do a general cleanup. Calling this function is not mandatory. 
                gpio.cleanup()
        except Exception as e:
                print('*** STOPPED %s\n' % e)
token_key.txtlink
1
2
3
4
consumer_key=
consumer_secret=
access_token_key=
access_token_secret=

mari kita test apakah kode yang kita buat bekerja.

Terminal
1
$ python countTweets.py -oauth token_key.txt [kata]

Berikut video pada saat 17 Agustusan yang sudah dipercepat, soalnya kemaren butuh 2jam.

Semua source code saya upload ke account github saya sofianhw/flagpole.

Comments