几百台手机加上开个多线程 就可以组成一个超强撸羊毛系统,投资的来 ^_^^_^^_^^_^
# -*- codeing: utf-8 -*-
import random
import time
from appium import webdriver
from appium.webdriver.common.touch_action import TouchAction
from sqlalchemy import (Integer, String, DateTime)
from sqlalchemy import create_engine, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
CONNECTION_STRING = "mysql+pymysql://{user}:{passwd}@{host}:{port}/{db_name}?charset=utf8".format(
user="root",
passwd="",
host="127.0.0.1",
port="3306",
db_name="scrapy",
)
DeclarativeBase = declarative_base()
# database init block
def db_connect():
return create_engine(CONNECTION_STRING)
def create_table(engine):
pass
class Reader(DeclarativeBase):
__tablename__ = "reader"
id = Column(Integer, primary_key=True)
title = Column('title', String(500))
createAt = Column('createAt', DateTime())
engine = db_connect()
create_table(engine)
session = sessionmaker(bind=engine)
ses = session()
def goList(driver):
try:
el = driver.find_element_by_xpath(u"//*[contains(@text, '输入邀请码')]")
action = TouchAction(driver)
action.press(el).move_to(x=0, y=500).release().perform()
el = driver.find_element_by_xpath(u"//*[contains(@text, '我的关注')]")
el.click()
time.sleep(6)
el = driver.find_element_by_xpath(u"//*[contains(@text, '小娱爱说车')]")
el.click()
time.sleep(3)
except:
goList(driver)
def scrollItem(driver):
global read_count
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+":next")
title = driver.find_element_by_id('com.jifen.qukan:id/z9')
print(title.text)
ad = Reader()
ad.title = title.text
ad.createAt = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
exists = ses.query(Reader).filter_by(title=title.text).all()
if len(exists) <= 0:
ses.add(ad)
title.click()
ses.commit()
else:
action2 = TouchAction(driver)
action2.press(title).move_to(x=0, y=1 * random.randint(0, 500)).release().perform()
time.sleep(2)
scrollItem(driver)
read_count = read_count + 1
def readArticle(driver):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ":read")
try:
el1 = driver.find_element_by_id('com.jifen.qukan:id/abz')
action1 = TouchAction(driver)
for x in range(0, 11):
time.sleep(3)
action1.press(el1).move_to(x=(1 + x) / 2, y=0).release().perform()
for x in range(0, 11):
time.sleep(3)
action1.press(el1).move_to(x=(1 + x) / 2, y=0).release().perform()
except:
print('read exception')
finally:
el1 = driver.find_element_by_id('com.jifen.qukan:id/ft')
el1.click()
read_count = 0
desire_caps = {}
desire_caps['platformName'] = 'Android'
desire_caps['platformVersion'] = '8.1.0'
desire_caps['noReset'] = True
desire_caps['appPackage'] = 'com.jifen.qukan'
desire_caps['appActivity'] = 'com.jifen.qkbase.start.JumpActivity'
desire_caps['deviceName'] = 'xiaomi-mi_8_se-4b36aa0c'
# desire_caps['unicodeKeyboard'] = True
# desire_caps['resetKeyboard'] = True
desire_caps["simpleIsVisibleCheck"] = True
driver = webdriver.Remote('http://127.0.0.1:4723/wd/hub', desire_caps)
home = driver.find_element_by_id('com.jifen.qukan:id/ke')
home.click()
try:
mask = driver.find_element_by_id('com.jifen.qukan:id/pf')
mask.click()
except:
print("no mask")
goList(driver)
while True:
try:
scrollItem(driver)
readArticle(driver)
except:
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+':exception')
driver.quit()